进行令人尴尬的并行数据连接时,dask 中出现 KilledWorker 错误

问题描述

我有一个 embarrassingly parallel workload,我正在读取一组镶木地板文件,将它们连接成更大的镶木地板文件,然后将其写回磁盘。我在一个分布式计算机(带有分布式文件系统)上运行它,大约有 300 个工人,每个工人有 20GB 的 RAM。每个单独的工作应该只消耗 2-3 GB 的 RAM,但不知何故工作人员由于内存错误而崩溃(获取distributed.scheduler.KilledWorker 异常)。我可以在工作人员的输出日志中看到以下内容

内存使用率很高,但工作人员没有数据可以存储到磁盘。可能 其他一些进程正在泄漏内存。进程内存:18.20 GB

with open('ts_files_list.txt','r') as f:
    all_files = f.readlines()

# There are about 500K files
all_files = [f.strip() for f in all_files]

# grouping them into groups of 50. 
# The concatenated df should be about 1GB in memory
npart = 10000
file_pieces = np.array_split(all_files,npart)

def read_and_combine(filenames,group_name):
    dfs = [pd.read_parquet(f) for f in filenames]
    grouped_df = pd.concat(dfs)
    grouped_df.to_parquet(f,engine='pyarrow')

group_names = [f'group{i} for i in range(npart)]
delayed_func = dask.delayed(read_and_combine)

# the following line shouldn't have resulted in a memory error,but it does
dask.compute(map(delayed_func,file_pieces,group_names)) 

在这里遗漏了什么明显的东西吗? 谢谢!

dask 版本:2021.01.0,pyarrow 版本:2.0.0,分布式版本:2021.01.0

解决方法

存在一些语法错误,但总体而言工作流程似乎合理。

with open('ts_files_list.txt','r') as f:
    all_files = f.readlines()

all_files = [f.strip() for f in all_files]

npart = 10000
file_pieces = np.array_split(all_files,npart)

def read_and_combine(filenames,group_name):
    grouped_df = pd.concat(pd.read_parquet(f) for f in filenames)
    grouped_df.to_parquet(group_name,engine='pyarrow')
    del grouped_df # this is optional (in principle dask should clean up these objects)

group_names = [f'group{i}' for i in range(npart)]
delayed_func = dask.delayed(read_and_combine)

dask.compute(map(delayed_func,file_pieces,group_names))

要记住的另一件事是 parquet 文件默认是压缩的,因此在解压缩时它们可能会占用比压缩文件大小更多的内存。不确定这是否适用于您的工作流程,但在遇到内存问题时需要记住一些事情。