问题描述
我有一个包含 bz2 压缩文件的 tarfile。我想将函数 clean_file
应用于每个 bz2 文件,并整理结果。在系列中,这很容易使用循环:
import pandas as pd
import json
import os
import bz2
import itertools
import datetime
import tarfile
from multiprocessing import Pool
def clean_file(member):
if '.bz2' in str(member):
f = tr.extractfile(member)
with bz2.open(f,"rt") as bzinput:
dicts = []
for i,line in enumerate(bzinput):
line = line.replace('"name"}','"name":" "}')
dat = json.loads(line)
dicts.append(dat)
bzinput.close()
f.close()
del f,bzinput
processed = dicts[0]
return processed
else:
pass
# Open tar file and get contents (members)
tr = tarfile.open('data.tar')
members = tr.getmembers()
num_files = len(members)
# Apply the clean_file function in series
i=0
processed_files = []
for m in members:
processed_files.append(clean_file(m))
i+=1
print('done '+str(i)+'/'+str(num_files))
但是,我需要能够并行执行此操作。我正在尝试的方法使用 Pool
像这样:
# Apply the clean_file function in parallel
if __name__ == '__main__':
with Pool(2) as p:
processed_files = list(p.map(clean_file,members))
但这会返回一个 OSError:
Traceback (most recent call last):
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/multiprocessing/pool.py",line 119,in worker
result = (True,func(*args,**kwds))
File "parse_data.py",line 19,in clean_file
for i,line in enumerate(bzinput):
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/bz2.py",line 195,in read1
return self._buffer.read1(size)
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/_compression.py",line 68,in readinto
data = self.read(len(byte_view))
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/_compression.py",line 103,in read
data = self._decompressor.decompress(rawblock,size)
OSError: Invalid data stream
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "parse_data.py",line 53,in <module>
processed_files = list(tqdm.tqdm(p.imap(clean_file,members),total=num_files))
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/site-packages/tqdm/std.py",line 1167,in __iter__
for obj in iterable:
File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/multiprocessing/pool.py",line 735,in next
raise value
OSError: Invalid data stream
所以我想这种方式不能正确访问 data.tar 或其他内容中的文件。如何并行应用该函数?
我猜这将适用于任何包含 bz2 文件的 tar 存档,但这是我重现错误的数据: https://github.com/johnf1004/reproduce_tar_error
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)