使用“as_completed”时,如何正确清除“dask.distributed”中的未完成期货?

问题描述

我有一个很大的作业队列,我想提前中止。 后来的作业依赖于较早的作业,因此我无法一次将所有作业排队。考虑以下 MWE:

from dask.distributed import Client,as_completed
import numpy as np


def work(_):
    return np.random.random(size=(100_000,50))


def main(func):
    with Client() as client:
        futures = client.map(func,range(10),pure=False)    # pre-determined work
        ac = as_completed(futures,with_results=True)
        for future,result in ac:
            new_future = client.submit(func,pure=False)  # work depends on earlier output
            ac.add(new_future)
            break # Some condition is met,remaining jobs are irrelevant & can be aborted/discarded
        ac.clear()


if __name__ == '__main__':
    main(work)

上面的例子一般会产生如下错误

distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:58617'],work-24a67a6d-4479-4f62-9865-bd48442198c4
nonetype: None
distributed.client - WARNING - Couldn't gather 1 keys,rescheduling {'work-24a67a6d-4479-4f62-9865-bd48442198c4': ('tcp://127.0.0.1:58617',)}

我希望 as_completed.clear 能够干净利落地处理剩余的期货。 我还做了一个变化来跟踪所有期货,并在调用 as_completed.clear 之前取消它们,但它产生了类似的结果。

是否有适当的方法来实现这种预期行为?

注意事项:

  • 如果 work 返回 None,问题似乎仍然发生,只是频率降低了。
  • 我在我的 Windows 机器上测试了这个,并从一个 Ubuntu docker 容器中测试。
  • 我使用了 Python 3.8.3 和 dask/distributed 2020.12.0

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)