在 dask.compute(*something) 调用上显示进度 使用普通的 import random from time import sleep import dask from dask.diagnostics import ProgressBar from dask.distributed import Client, progress # simulate work @dask.delayed def work(x): sleep(x) ret

问题描述

我使用 dask代码具有以下结构:

@dask.delayed
def calculate(data):
    services = data.service_id
    prices = data.price
    
    return [services,prices]

output = []

for qid in notebook.tqdm(ids):
    r = calculate(parts[parts.quotation_id == qid])
    output.append(r)

事实证明,当我在 dask.compute() 列表上调用 output 方法时,我没有任何进度指示。诊断 UI 不会“捕获”此操作,我什至不确定它是否正常运行(从我的处理器使用情况来看,我认为不是)。

result = dask.compute(*output)

我正在关注 dask 文档中的“最佳实践”文章

https://docs.dask.org/en/latest/delayed-best-practices.html

我缺少什么?

编辑:我认为它正在运行,因为我仍然收到内存泄漏/高使用率警告。仍然没有进度指示。

解决方法

正如 related post 中所指出的,dict 有两种显示进度的方法:一种用于“正常”dask,另一种用于 dask

这是一个可重现的例子:

dask.distributed

使用普通的 import random from time import sleep import dask from dask.diagnostics import ProgressBar from dask.distributed import Client,progress # simulate work @dask.delayed def work(x): sleep(x) return True # generate tasks random.seed(42) tasks = [work(random.randint(1,5)) for x in range(50)]

dask

产生:

enter image description here

使用 ProgressBar().register() dask.compute(*tasks)

dask.distributed

产生:

enter image description here