问题描述
我有2个文本文件(* .txt),其中包含以下格式的唯一字符串:
udtvbacfbbxfdffzpwsqzxyznecbqxgebuudzgzn:refmfxaawuuilznjrxuogrjqhlmhslkmprdxbascpoxda
ltswbjfsnejkaxyzwyjyfggjynndwkivegqdarjg:qyktyzugbgclpovyvmgtkihxqisuawesmcvsjzukcbrzi
第一个文件包含 5000万行(4.3 GB),第二个文件包含 100万行(112 MB)。一行包含40个字符,定界符:和另外45个字符。
任务:获取两个文件的唯一值。即,您需要一个 csv或txt 文件,其中第二行中有行,而第二行中没有行首先。
我正在尝试使用 vaex (Vaex):
import vaex
base_files = ['file1.txt']
for i,txt_file in enumerate(base_files,1):
for j,dv in enumerate(vaex.from_csv(txt_file,chunk_size=5_000_000,names=['data']),1):
dv.export_hdf5(f'hdf5_base/base_{i:02}_{j:02}.hdf5')
check_files = ['file2.txt']
for i,txt_file in enumerate(check_files,1):
dv.export_hdf5(f'hdf5_check/check_{i:02}_{j:02}.hdf5')
dv_base = vaex.open('hdf5_base/*.hdf5')
dv_check = vaex.open('hdf5_check/*.hdf5')
dv_result = dv_check.join(dv_base,on='data',how='inner',inplace=True)
dv_result.export(path='result.csv')
结果,我得到了具有唯一行值的 result.csv 文件。但是验证过程需要很长时间。此外,它使用所有可用的RAM和所有处理器资源。 如何加快这一过程?我究竟做错了什么?有什么可以做得更好的?值得使用其他库(pandas,dask)进行检查吗?它们会更快吗?
UPD 10.11.2020 到目前为止,我还没有找到比以下选项更快的东西:
from io import StringIO
def read_lines(filename):
handle = StringIO(filename)
for line in handle:
yield line.rstrip('\n')
def read_in_chunks(file_obj,chunk_size=10485760):
while True:
data = file_obj.read(chunk_size)
if not data:
break
yield data
file_check = open('check.txt','r',errors='ignore').read()
check_set = {elem for elem in read_lines(file_check)}
with open(file='base.txt',mode='r',errors='ignore') as file_base:
for idx,chunk in enumerate(read_in_chunks(file_base),1):
print(f'Checked [{idx}0 Mb]')
for elem in read_lines(chunk):
if elem in check_set:
check_set.remove(elem)
print(f'Unique rows: [{len(check_set)}]')
UPD 11.11.2020: 感谢@ m9_psy提供提高性能的提示。真的更快!当前,最快的方法是:
from io import BytesIO
check_set = {elem for elem in BytesIO(open('check.txt','rb').read())}
with open('base.txt','rb') as file_base:
for line in file_base:
if line in check_set:
check_set.remove(line)
print(f'Unique rows: [{len(check_set)}]')
是否可以进一步加快此过程?
解决方法
我怀疑join
操作需要进行n * m
比较操作,其中n
和m
是两个数据帧的长度。
此外,您的描述和代码之间也存在不一致之处:
- “也就是说,您需要一个csv或txt文件,其行位于第二个文件中,而不在第一个文件中。” ⟶这表示在
dv_check
中,而不是dv_base
-
dv_check.join(dv_base,on='data',how='inner',inplace=True)
⟶这意味着dv_check
和dv_base
无论如何,一种想法是使用set
,因为检查集合中的成员资格的时间复杂度为O(1)
,而检查列表中的成员资格的复杂度为O(n)
。如果您熟悉SQL世界,这等效于从LOOP JOIN策略过渡到HASH JOIN策略:
# This will take care of removing the duplicates
base_set = set(dv_base['data'])
check_set = set(dv_check['data'])
# In `dv_check` but not `dv_base`
keys = check_set - base_set
# In both `dv_check` and `dv_base`
keys = check_set & base_set
这只会为您提供满足您条件的键。您仍然必须过滤两个数据框以获取其他属性。
在具有16GB RAM的2014年iMac上耗时1分14秒完成。
,让我们生成一个数据集来模仿您的示例
def Movezips(self,email):
time.sleep(2)
print("starting moving")
source_dir = 'C:/Download'+email+'/'
dst = 'E:/BackUpZip/'+email
Path(dst).mkdir(parents=True,exist_ok=True)
dst=dst+'/'
files = glob.iglob(os.path.join(source_dir,"*.zip"))
for file in files:
if os.path.isfile(file):
shutil.copy(file,dst)
for file in files:
if os.path.isfile(file):
shutil.rmtree(file)
现在,要查找支票中不在基础中的行,我们可以将其合并,然后删除不匹配的行:
import vaex
import numpy as np
N = 50_000_000 # 50 million rows for base
N2 = 1_000_000 # 1 million for check
M = 40+1+45 # chars for each string
N_dup = 10_000 # number of duplicate rows in the checks
s1 = np.random.randint(ord('a'),ord('z'),(N,M),np.uint32).view(f'U{M}').reshape(N)
s2 = np.random.randint(ord('a'),(N2,np.uint32).view(f'U{M}').reshape(N2)
# make sure s2 has rows that match s1
dups = np.random.choice(N2,N_dup,replace=False)
s2[dups] = s1[np.random.choice(N,replace=False)]
# save the data to disk
vaex.from_arrays(s=s1).export('/data/tmp/base.hdf5')
vaex.from_arrays(s=s2).export('/data/tmp/check.hdf5')
,
这是另一种方法。检查文件约为0.1 GB(适合内存)。基本文件最大为100 GB(因此一次处理一行)。
创建测试数据和生成器功能以导入数据
from io import StringIO
# test data for base (>50 million lines)
base_file = '''a
b
c
d
e
'''
# test data for check (1 million lines)
check_file = '''d
e
f
g
'''
def read_lines(filename):
''' Read data file one line at a time (with generator function).'''
handle = StringIO(filename)
for line in handle:
yield line.rstrip('\n')
仅在检查文件中查找元素(在@CodeDifferent示例中为check_set - base_set
)
check_set = {elem for elem in read_lines(check_file)}
for elem in read_lines(base_file):
if elem in check_set:
check_set.remove(elem)
print(check_set)
{'g','f'}
查找交集(在@CodeDifferent的示例中为check_set & base_set
)
check_set = {elem for elem in read_lines(check_file)}
common_elements = set()
for elem in read_lines(base_file):
if elem in check_set:
common_elements.add(elem)
print(common_elements)
{'d','e'}
我认为,当(a)基本文件比检查文件大得多,并且(b)基本文件对于内存中的数据结构太大时,这种方法最有效。
,注意!我的原始答案是错误的。 @codedifferent是正确的。这是我略有不同的版本。这可能对某人有帮助。我认为该文本文件仅包含一列。
import pandas as pd
filepath_check = './data/checkfile.csv'
filepath_base = './data/basefile.csv'
# load the small data into memory
dfcheck = pd.read_csv(filepath_check)
dfcheck = set(dfcheck['data'])
# but load the big data in chunk
chunk_iter = pd.read_csv(filepath_base,chunksize=100000)
# for each chunk,remove intersect if any.
for chunk in chunk_iter:
dfcheck = dfcheck - set(chunk['data'])
print(len(dfcheck))
# write result
with open('./results.txt','w') as filehandler:
for item in dfcheck:
filehandler.write('%s\n'% item)
旧答案
我现在也遇到了类似的问题。我的解决方案是使用Dask,但是可以肯定的是Vaex应该没问题。
import dask.dataframe as dd
base_file = dd.read_csv('./base_file.csv')
check_file = dd.read_csv('./check_file.csv')
base_file = base_file.set_index('data')
check_file = check_file.set_index('data')
base_file.to_parquet('./results/base_file',compression=None)
check_file.to_parquet('./results/base_file',compression=None)
base_file.read_parquet('./results/base_file')
check_file.read_parquet('./results/check_file')
merged = dd.merge(base_file,check_file,left_index=True,right_index=True)
# save to csv from dask dataframe
merged.to_csv('/results/dask_result.csv',single_file = True)
# or save to csv from pandas dataframe
pandas_merged = merged.compute() # convert to pandas
pandas_merged.to_csv(...)
- 为什么要使用set_index?这使加入过程更快。 https://docs.dask.org/en/latest/dataframe-best-practices.html#joins
- 为什么要保存到镶木地板上以供日后阅读?在我的经验中, 即使使用Dask,直接读取CSV也会占用更多内存。加载实木复合地板文件肯定更快。 https://docs.dask.org/en/latest/dataframe-best-practices.html#store-data-in-apache-parquet-format。在进行了诸如join和set_index之类的详尽工作之后,我有很多保存/加载行来保存结果。
- 如果check_file足够小,则可以在加载文件后或必要时使用
check_file = check_file.persist()
将整个check_file加载到内存中。