Dask计算永无止境

问题描述

我尝试了以下代码

import numpy as np  
from dask.distributed import Client  
from dask.jobqueue import PBSCluster  
import dask.array as da

cluster = PBSCluster(processes=1,cores=8,memory="40GB",local directory='$TMPDIR',walltime='1:00:00',interface='ib0')  

task = client.submit(lambda: np.random.uniform(size=(623000,73,41)))
arr = da.from_delayed(task,shape=(623000,41),dtype=np.float32)
res = client.compute(arr)

res.result()

过程还没有结束。它总是重新启动,并且仅由一名工人执行。

代码有什么问题? 是否可以将其分发到所有内核?

解决方法

cluster = PBSCluster(processes=1,cores=8,memory="40GB",

您正在请求具有 8 个内核的单个工作器,拥有多个工作器可能是个好主意。

task = client.submit(lambda: np.random.uniform(size=(623000,73,41)))

在这一行中,您要求工作人员创建大约 20 亿个数字的大数组。实现此目标的更好方法是使用 da.random.random 在多个工作人员之间分配工作。

res = client.compute(arr)

您要求工作人员计算 dask 数组,但这可能并不理想,更好的方法是要求工作人员计算一些通常会减小尺寸的感兴趣对象。