问题描述
我一直试图在dask中合并32个带有公共id列的文件。总共文件大小为82.4GB。我迭代遍历文件,对某些文件进行外部合并,然后对其他文件进行左合并。我不确定是否需要设置分布式客户端。我还尝试将id列设置为索引,并使用它进行合并,但是它不起作用。我也尝试了熊猫,但遇到内存错误,即使我认为这可以放入RAM(524GB)。
问题在于,dask会创建大量的临时.partd文件,以至于它占用了950GB的可用空间。看起来并没有充分利用RAM。我怀疑工作流会创建许多临时文件,即使在未使用它们的过程中也不会清除它们。一旦脚本由于磁盘空间而失败并出错,则将它们清除。
任何将其改进的建议都会有所帮助!非常感谢!
代码如下:
#!/usr/bin/env python
import pandas as pd
import dask.dataframe as dd
if __name__ == '__main__':
list_df=pd.read_csv("path_2_files.csv")
for index,row in list_df.iterrows():
print("Starting "+str(index))
if (index==0):
ddf = dd.read_csv(row['path']+row['filename'],sep="\t",\
dtype={'id': 'object',\
'data1': 'float64',\
'data2': 'float64',\
'data3': 'float64' \
})
#Check for duplicates and drop
ddf=ddf.drop_duplicates()
print(len(ddf))
else:
ddf2 = dd.read_csv(row['path'] + row['filename'],\
dtype={'id': 'object',\
'data1': 'float64',\
'data2': 'float64',\
'data3': 'float64' \
})
#Check for duplicates and drop
ddf2=ddf2.drop_duplicates()
if (row['type']=='test1'):
ddf = dd.merge(ddf,ddf2,on='id',how="outer")
print(len(ddf))
del ddf2
elif (row['type']=='test2' or row['type']=='test3'):
ddf = dd.merge(ddf,how="left")
print(len(ddf))
del ddf2
ddf.to_parquet("s3://some/path/merged_file.parquet",object_encoding='utf8')
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)