问题描述
寻找嵌套并行作业的最佳实践。我无法嵌套 dask 延迟或期货,因此我将两者混合以使其正常工作。这不推荐吗?有没有更好的方法来做到这一点?示例:
import dask
from dask.distributed import Client
import random
import time
client = Client()
def rndSeries(x):
time.sleep(1)
return random.sample(range(1,50),x)
def sqNum(x):
time.sleep(1)
return x**2
def subProcess(li):
results=[]
for i in li:
r = dask.delayed(sqNum)(i)
results.append(r)
return dask.compute(sum(results))[0]
futures=[]
for i in range(10):
x = client.submit(rndSeries,random.randrange(5,10,1))
y = client.submit(subProcess,x)
futures.append(y)
client.gather(futures)
解决方法
考虑修改您的脚本以获得确定性的工作流程。如果从 1 个 worker 开始,您将看到该进程在 20 秒内完成(正如预期的那样,2 个进程 1 秒 + 6 个进程 3 秒)。如果你有 2 个 worker,执行时间会降到 10 秒。
import dask
from dask.distributed import Client,LocalCluster
import time
import numpy as np
cluster = LocalCluster(n_workers=1,threads_per_worker=1)
client = Client(cluster)
# if inside jupyter split the code below into a new cell
# to see accurate timing
%%time
def rndSeries(x):
time.sleep(1)
return np.random.rand()
def sqNum(x):
time.sleep(3)
return 1
def subProcess(li):
results=[]
li = [1,2,3]
for i in li:
r = dask.delayed(sqNum)(i)
results.append(r)
return dask.compute(sum(results))[0]
futures=[]
for i in range(2):
x = client.submit(rndSeries,np.random.rand())
y = client.submit(subProcess,x)
futures.append(y)
client.gather(futures)
如果你有 6 个工人会怎样?执行时间现在是 4 秒(此任务可能的最短时间),因此似乎未来版本中 dask.compute()
的唯一缺点是它强制延迟的结果在单个工作器上。这在很多情况下可能没问题,但是,如果所有延迟任务的总资源需求超过单个 worker 的资源,那么最好的处理方式是从任务提交任务:https://distributed.dask.org/en/latest/task-launch.html