计算具有共同依赖关系的两个值时,Dask 内存使用率高

问题描述

我在一台机器上使用 daskLocalCluster 有 4 个进程、16 个线程、68.56GB 内存)并且在尝试一次计算两个共享依赖项的结果时遇到了工作内存问题。

>

在下面显示的示例中,仅使用一次计算即可计算 result 运行良好且快速,工作人员的合并内存使用量最大约为 1GB。但是,当使用两次计算计算 results 时,worker 会迅速使用所有内存并在总内存使用量约为 40GB 时开始写入磁盘。计算最终会完成,但是一旦开始写入磁盘,就会出现预期的大幅放缓。

直观地说,如果读入一个块,然后立即计算其两个总和,则可以丢弃该块并且内存使用率保持较低。但是,dask 似乎正在优先加载数据,而不是稍后清理内存的聚合计算。

如果您能帮助理解这里发生的事情,我们将不胜感激。如何计算具有共同依赖关系的两个结果,而无需两次读取底层数据或将其完全读入内存?

import dask
import dask.dataframe as dd
import dask.array as da
from dask.distributed import Client

client = Client("localhost:8786")

array = da.random.normal(size=(int(1e9),10),chunks=(int(1e6),10))
df = dd.from_array(array,columns=[str(i) for i in range(10)])

# does not blow up worker memory,overall usage stays below 1GB total
result = dask.compute(df["0"].sum())

# does blow up worker memory
results = dask.compute([df["0"].sum(),df["1"].sum()])

解决方法

数组的构造方式,每次创建一个块时,它必须生成数组的每一列。因此,优化的一个机会(如果可能)是以允许逐列处理的方式生成/加载数组。这将减少单个任务的内存负载。

优化的另一个场所是明确指定公共依赖项,例如 dask.compute(df[['0','1']].sum()) 将高效运行。

然而,更重要的一点是,默认情况下 dask 遵循一些关于如何优先处理工作的经验法则,see here。您有多种干预选项(不确定此列表是否详尽):自定义优先级、资源限制、修改计算图(允许工作人员从中间任务中释放内存,而无需等待最终任务完成)。

修改图形的一种简单方法是通过手动计算中间总和来分解最终总和数字与所有中间任务之间的依赖关系:

[results] = dask.compute([df["0"].map_partitions(sum),df["1"].map_partitions(sum)])

请注意,results 将是两个子列表的列表,但是计算每个子列表的总和很简单(尝试在延迟对象上运行 sum 会触发计算,因此更有效)在计算 sum 后运行 results)。