Dask 延迟总和被杀死但有足够的资源

问题描述

我正在创建一个读取整个文件夹的函数,创建一个 dask 数据帧,然后处理该数据帧的分区并对结果求和,如下所示:

import dask.dataframe as dd
from dask import delayed,compute

def partitions_func(folder):
    df = dd.read_csv(f'{folder}/*.csv')
    partial_results = []
    for partition in df.partitions:
        partial = another_function(partition)
        partial_results.append(partial)
    total = delayed(sum)(partial_results)
    return total

partitions_func (another_function) 中调用函数也被延迟了。

@delayed
def another_function(partition):
    # Partition processing
    return result

我检查过,处理过程中创建的变量都很小,所以它们应该不会引起任何问题。分区可以很大,但不能大于可用 RAM。

当我执行 partitions_func(folder) 时,进程被杀死。起初,我认为问题与有两个 delayed 有关,一个another_function 上,一个delayed(sum) 上。

delayed 中移除 another_function 装饰器会导致问题,因为参数是 dask 数据帧,您无法执行 tolist() 之类的操作。我尝试从 delayed删除 sum,因为我认为这可能是并行化和可用资源的问题,但该进程也会被终止。

但是,我知道有 5 个分区。如果我从 total = delayed(sum)(partial_results)删除语句 partitions_func 并改为“手动”计算总和,则一切都按预期进行:

total = partial_results[0].compute() + partial_results[1].compute() + partial_results[2].compute() \
        + partial_results[3].compute() + partial_results[4].compute()

谢谢!

解决方法

Dask 数据帧会创建一系列延迟对象,因此当您调用延迟函数 another_function 时,它会变成嵌套延迟,而 dask.compute 将无法处理它。一种选择是使用.map_partitions(),典型的例子是df.map_partitions(len).compute(),它将计算每个分区的长度。因此,如果您可以重写 another_function 以接受 Pandas 数据帧,并移除延迟装饰器,那么您的代码将大致如下所示:

df = dd.read_csv(f'{folder}/*.csv')
total = df.map_partitions(another_function)

现在 total 是一个延迟对象,您可以将其传递给 dask.compute(或简单地运行 total = df.map_partitions(another_function).compute())。