问题描述
我想创建一个包含cpu和GPU任务的dask Delayed流。 GPU任务只能在GPU工作线程上运行,并且GPU工作线程只有一个GPU,并且一次只能处理一个GPU任务。
不幸的是,我看不到在Delayed API中指定辅助资源的方法。
这是常见的代码:
client = Client(resources={'GPU': 1})
@delayed
def fcpu(x,y):
sleep(1)
return x + y
@delayed
def fgpu(x,y):
sleep(1)
return x + y
这是用纯延迟形式编写的流程。该代码无法正常运行,因为它不了解GPU资源。
# STEP ONE: two parallel cpu tasks
a = fcpu(1,1)
b = fcpu(10,10)
# STEP TWO: two GPU tasks
c = fgpu(a,b) # Requires 1 GPU
d = fgpu(a,b) # Requires 1 GPU
# STEP THREE: final cpu task
e = fcpu(c,d)
%time e.compute() # 3 seconds
这是我能想到的最好的解决方案。它将Delayed语法与Client.compute()期货结合在一起。它的行为似乎正确,但是非常丑陋。
# STEP ONE: two parallel cpu tasks
a = fcpu(1,10)
a_future,b_future = client.compute([a,b]) # Wo DON'T want a resource limit
# STEP TWO: two GPU tasks - only resources to run one at a time
c = fgpu(a_future,b_future)
d = fgpu(a_future,b_future)
c_future,d_future = client.compute([c,d],resources={'GPU': 1})
# STEP THREE: final cpu task
e = fcpu(c_future,d_future)
res = e.compute()
有更好的方法吗?
解决方法
也许类似于https://jobqueue.dask.org/en/latest/examples.html中描述的方法是在一台 GPU 机器或带有 SSD 的机器上处理的情况。
def step_1_w_single_GPU(data):
return "Step 1 done for: %s" % data
def step_2_w_local_IO(data):
return "Step 2 done for: %s" % data
stage_1 = [delayed(step_1_w_single_GPU)(i) for i in range(10)]
stage_2 = [delayed(step_2_w_local_IO)(s2) for s2 in stage_1]
result_stage_2 = client.compute(stage_2,resources={tuple(stage_1): {'GPU': 1},tuple(stage_2): {'ssdGB': 100}})