常规 Dask 和 dask.distributed 之间的关系

问题描述

我不明白常规 daskdask.distributed间的关系。

使用 dask.distributed,例如使用 Futures 接口,我必须显式创建一个由本地或远程集群支持的客户端,然后使用 client.submit() 提交给它。

使用常规的 dask,例如使用 Delayed 接口,我只在我的函数上使用 delayed()

delayed(或 compute)如何确定我的计算发生的位置?它背后一定有一些全局状态——但我将如何访问它?如果我理解正确,delayed 使用 dask.distributed 客户端(如果存在)。它是否使用类似

client = None
try:
    client = Client.current()
except ValueError:
    pass
if client is not None:
    # use client
else:
    # use default scheduler

如果是这样,为什么不对 submit 使用相同的逻辑?

client = None
try:
    client = Client.current()
except ValueError:
    pass
if client is not None:
    # use client
else:
    # fail because futures don't work on the default scheduler

最后,延迟物体和未来物体看起来非常相似。为什么第一个可以同时使用 dask.distributed 客户端和认调度程序,而期货需要 dask.distributed

解决方法

是的,有一些全局状态分配了当前客户端

https://github.com/dask/distributed/blob/f3f4bffea0640c01fc54f49c3219cf5807d14c66/distributed/client.py#L93

如果您对延迟对象调用 compute 方法,您最终将使用当前客户端

Dask 延迟只是构建计算图的语法糖。当您调用计算时,图形最终会通过分布式客户端进行分派。

future 是指集群上可能尚未计算的远程结果。延迟对象尚未提交到集群

@delayed
def func(x):
   return x
a = func(1)

在这种情况下,a 是一个延迟对象。该任务根本没有在集群上排队

future = client.compute(a,sync=False)

将任务提交到集群后,您将获得一个未来。

,

Dask 有多个后端。如果您不指定一个,则所有内容都在本地集群上运行,该集群的进程数与 CPU 中的内核数一样多。在定义集群(本地、Kubernetes、HPC、Spark)时,您可以准确指定您想要的内容。然而,客户端只看到的是什么以及它是如何执行的。

所有期货在您发送时都在您的后端执行,但您必须等待结果返回。与此同时,您可以在客户端上做其他事情。完成后,您可以使用 .result 获取结果。我还没有使用过期货 API,但它应该像 Python 并发期货一样工作。这也可能是您必须事先启动客户端的原因。 Dask 希望尽可能地镜像 API。 更多信息here

延迟、数据帧或数组 API 仅在您调用 .compute() 后将计算发送到后端。然后,您必须等待结果返回,并且不能在两者之间做任何事情。 更多信息here

,

future 不能在本地机器上使用(没有本地集群),因为它会立即触发计算,因此在同一代码中的任何进一步计算都将被阻止。 delayed 允许您将计算推迟到 DAG 形成。因此 delayed 可以在有或没有集群的单台机器上运行。