嵌套 dask 延迟或期货

问题描述

寻找嵌套并行作业的最佳实践。我无法嵌套 dask 延迟或期货,因此我将两者混合以使其正常工作。这不推荐吗?有没有更好的方法来做到这一点?示例:

import dask
from dask.distributed import Client
import random
import time
client = Client()

def rndSeries(x):
    time.sleep(1)
    return random.sample(range(1,50),x)

def sqNum(x):
    time.sleep(1)
    return x**2

def subProcess(li):
    results=[]
    for i in li:
        r = dask.delayed(sqNum)(i)
        results.append(r)
    return dask.compute(sum(results))[0]

futures=[]
for i in range(10):
    x = client.submit(rndSeries,random.randrange(5,10,1))
    y = client.submit(subProcess,x)
    futures.append(y)
client.gather(futures)

解决方法

考虑修改您的脚本以获得确定性的工作流程。如果从 1 个 worker 开始,您将看到该进程在 20 秒内完成(正如预期的那样,2 个进程 1 秒 + 6 个进程 3 秒)。如果你有 2 个 worker,执行时间会降到 10 秒。

import dask
from dask.distributed import Client,LocalCluster
import time
import numpy as np

cluster = LocalCluster(n_workers=1,threads_per_worker=1)
client = Client(cluster)

# if inside jupyter split the code below into a new cell
# to see accurate timing
%%time
def rndSeries(x):
    time.sleep(1)
    return np.random.rand()

def sqNum(x):
    time.sleep(3)
    return 1

def subProcess(li):
    results=[]
    li = [1,2,3]
    for i in li:
        r = dask.delayed(sqNum)(i)
        results.append(r)
    return dask.compute(sum(results))[0]

futures=[]
for i in range(2):
    x = client.submit(rndSeries,np.random.rand())
    y = client.submit(subProcess,x)
    futures.append(y)
client.gather(futures)

如果你有 6 个工人会怎样?执行时间现在是 4 秒(此任务可能的最短时间),因此似乎未来版本中 dask.compute() 的唯一缺点是它强制延迟的结果在单个工作器上。这在很多情况下可能没问题,但是,如果所有延迟任务的总资源需求超过单个 worker 的资源,那么最好的处理方式是从任务提交任务:https://distributed.dask.org/en/latest/task-launch.html