问题描述
我尝试了以下代码:
import numpy as np
from dask.distributed import Client
from dask.jobqueue import PBSCluster
import dask.array as da
cluster = PBSCluster(processes=1,cores=8,memory="40GB",local directory='$TMPDIR',walltime='1:00:00',interface='ib0')
task = client.submit(lambda: np.random.uniform(size=(623000,73,41)))
arr = da.from_delayed(task,shape=(623000,41),dtype=np.float32)
res = client.compute(arr)
res.result()
过程还没有结束。它总是重新启动,并且仅由一名工人执行。
代码有什么问题? 是否可以将其分发到所有内核?
解决方法
cluster = PBSCluster(processes=1,cores=8,memory="40GB",
您正在请求具有 8 个内核的单个工作器,拥有多个工作器可能是个好主意。
task = client.submit(lambda: np.random.uniform(size=(623000,73,41)))
在这一行中,您要求工作人员创建大约 20 亿个数字的大数组。实现此目标的更好方法是使用 da.random.random
在多个工作人员之间分配工作。
res = client.compute(arr)
您要求工作人员计算 dask 数组,但这可能并不理想,更好的方法是要求工作人员计算一些通常会减小尺寸的感兴趣对象。