Dask:如何将延迟功能与工作人员资源一起使用?

问题描述

我想创建一个包含cpu和GPU任务的dask Delayed流。 GPU任务只能在GPU工作线程上运行,并且GPU工作线程只有一个GPU,并且一次只能处理一个GPU任务。

不幸的是,我看不到在Delayed API中指定辅助资源的方法

这是常见的代码

client = Client(resources={'GPU': 1})

@delayed
def fcpu(x,y):
    sleep(1)
    return x + y

@delayed
def fgpu(x,y):
    sleep(1)
    return x + y

这是用纯延迟形式编写的流程。该代码无法正常运行,因为它不了解GPU资源。

# STEP ONE: two parallel cpu tasks
a = fcpu(1,1)
b = fcpu(10,10)

# STEP TWO: two GPU tasks
c = fgpu(a,b)  # Requires 1 GPU
d = fgpu(a,b)  # Requires 1 GPU

# STEP THREE: final cpu task
e = fcpu(c,d)

%time e.compute()  # 3 seconds

这是我能想到的最好的解决方案。它将Delayed语法与Client.compute()期货结合在一起。它的行为似乎正确,但是非常丑陋。

# STEP ONE: two parallel cpu tasks
a = fcpu(1,10)
a_future,b_future = client.compute([a,b]) # Wo DON'T want a resource limit

# STEP TWO: two GPU tasks - only resources to run one at a time
c = fgpu(a_future,b_future)
d = fgpu(a_future,b_future)
c_future,d_future = client.compute([c,d],resources={'GPU': 1})

# STEP THREE: final cpu task
e = fcpu(c_future,d_future)
res = e.compute()

有更好的方法吗?

解决方法

也许类似于https://jobqueue.dask.org/en/latest/examples.html中描述的方法是在一台 GPU 机器或带有 SSD 的机器上处理的情况。

def step_1_w_single_GPU(data):
    return "Step 1 done for: %s" % data


def step_2_w_local_IO(data):
    return "Step 2 done for: %s" % data


stage_1 = [delayed(step_1_w_single_GPU)(i) for i in range(10)]
stage_2 = [delayed(step_2_w_local_IO)(s2) for s2 in stage_1]

result_stage_2 = client.compute(stage_2,resources={tuple(stage_1): {'GPU': 1},tuple(stage_2): {'ssdGB': 100}})