问题描述
我有一个很大的作业队列,我想提前中止。 后来的作业依赖于较早的作业,因此我无法一次将所有作业排队。考虑以下 MWE:
from dask.distributed import Client,as_completed
import numpy as np
def work(_):
return np.random.random(size=(100_000,50))
def main(func):
with Client() as client:
futures = client.map(func,range(10),pure=False) # pre-determined work
ac = as_completed(futures,with_results=True)
for future,result in ac:
new_future = client.submit(func,pure=False) # work depends on earlier output
ac.add(new_future)
break # Some condition is met,remaining jobs are irrelevant & can be aborted/discarded
ac.clear()
if __name__ == '__main__':
main(work)
上面的例子一般会产生如下错误:
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:58617'],work-24a67a6d-4479-4f62-9865-bd48442198c4
nonetype: None
distributed.client - WARNING - Couldn't gather 1 keys,rescheduling {'work-24a67a6d-4479-4f62-9865-bd48442198c4': ('tcp://127.0.0.1:58617',)}
我希望 as_completed.clear
能够干净利落地处理剩余的期货。
我还做了一个变化来跟踪所有期货,并在调用 as_completed.clear
之前取消它们,但它产生了类似的结果。
是否有适当的方法来实现这种预期行为?
注意事项:
- 如果
work
返回None
,问题似乎仍然发生,只是频率降低了。 - 我在我的 Windows 机器上测试了这个,并从一个 Ubuntu docker 容器中测试。
- 我使用了 Python 3.8.3 和 dask/distributed 2020.12.0
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)