问题描述
我有一个 embarrassingly parallel workload,我正在读取一组镶木地板文件,将它们连接成更大的镶木地板文件,然后将其写回磁盘。我在一个分布式计算机(带有分布式文件系统)上运行它,大约有 300 个工人,每个工人有 20GB 的 RAM。每个单独的工作应该只消耗 2-3 GB 的 RAM,但不知何故工作人员由于内存错误而崩溃(获取:distributed.scheduler.KilledWorker 异常)。我可以在工作人员的输出日志中看到以下内容:
内存使用率很高,但工作人员没有数据可以存储到磁盘。可能 其他一些进程正在泄漏内存。进程内存:18.20 GB
with open('ts_files_list.txt','r') as f:
all_files = f.readlines()
# There are about 500K files
all_files = [f.strip() for f in all_files]
# grouping them into groups of 50.
# The concatenated df should be about 1GB in memory
npart = 10000
file_pieces = np.array_split(all_files,npart)
def read_and_combine(filenames,group_name):
dfs = [pd.read_parquet(f) for f in filenames]
grouped_df = pd.concat(dfs)
grouped_df.to_parquet(f,engine='pyarrow')
group_names = [f'group{i} for i in range(npart)]
delayed_func = dask.delayed(read_and_combine)
# the following line shouldn't have resulted in a memory error,but it does
dask.compute(map(delayed_func,file_pieces,group_names))
我在这里遗漏了什么明显的东西吗? 谢谢!
dask 版本:2021.01.0,pyarrow 版本:2.0.0,分布式版本:2021.01.0
解决方法
存在一些语法错误,但总体而言工作流程似乎合理。
with open('ts_files_list.txt','r') as f:
all_files = f.readlines()
all_files = [f.strip() for f in all_files]
npart = 10000
file_pieces = np.array_split(all_files,npart)
def read_and_combine(filenames,group_name):
grouped_df = pd.concat(pd.read_parquet(f) for f in filenames)
grouped_df.to_parquet(group_name,engine='pyarrow')
del grouped_df # this is optional (in principle dask should clean up these objects)
group_names = [f'group{i}' for i in range(npart)]
delayed_func = dask.delayed(read_and_combine)
dask.compute(map(delayed_func,file_pieces,group_names))
要记住的另一件事是 parquet
文件默认是压缩的,因此在解压缩时它们可能会占用比压缩文件大小更多的内存。不确定这是否适用于您的工作流程,但在遇到内存问题时需要记住一些事情。