如何快速比较两个文本文件并获得唯一的行?

问题描述

我有2个文本文件(* .txt),其中包含以下格式的唯一字符串:

udtvbacfbbxfdffzpwsqzxyznecbqxgebuudzgzn:refmfxaawuuilznjrxuogrjqhlmhslkmprdxbascpoxda
ltswbjfsnejkaxyzwyjyfggjynndwkivegqdarjg:qyktyzugbgclpovyvmgtkihxqisuawesmcvsjzukcbrzi

一个文件包含 5000万行(4.3 GB),第二个文件包含 100万行(112 MB)。一行包含40个字符,定界符:和另外45个字符。

任务:获取两个文件的唯一值。即,您需要一个 csv或txt 文件,其中第二行中有行,而第二行中没有行首先。

我正在尝试使用 vaex Vaex):

import vaex

base_files = ['file1.txt']
for i,txt_file in enumerate(base_files,1):
    for j,dv in enumerate(vaex.from_csv(txt_file,chunk_size=5_000_000,names=['data']),1):
        dv.export_hdf5(f'hdf5_base/base_{i:02}_{j:02}.hdf5')

check_files = ['file2.txt']
for i,txt_file in enumerate(check_files,1):
        dv.export_hdf5(f'hdf5_check/check_{i:02}_{j:02}.hdf5')

dv_base = vaex.open('hdf5_base/*.hdf5')
dv_check = vaex.open('hdf5_check/*.hdf5')
dv_result = dv_check.join(dv_base,on='data',how='inner',inplace=True)
dv_result.export(path='result.csv')

结果,我得到了具有唯一行值的 result.csv 文件。但是验证过程需要很长时间。此外,它使用所有可用的RAM和所有处理器资源。 如何加快这一过程?我究竟做错了什么?有什么可以做得更好的?值得使用其他库(pandas,dask)进行检查吗?它们会更快吗?


UPD 10.11.2020 到目前为止,我还没有找到比以下选项更快的东西:

from io import StringIO


def read_lines(filename):
    handle = StringIO(filename)
    for line in handle:
        yield line.rstrip('\n')


def read_in_chunks(file_obj,chunk_size=10485760):
    while True:
        data = file_obj.read(chunk_size)
        if not data:
            break
        yield data


file_check = open('check.txt','r',errors='ignore').read()

check_set = {elem for elem in read_lines(file_check)}

with open(file='base.txt',mode='r',errors='ignore') as file_base:
    for idx,chunk in enumerate(read_in_chunks(file_base),1):
        print(f'Checked [{idx}0 Mb]')
        for elem in read_lines(chunk):
            if elem in check_set:
                check_set.remove(elem)

print(f'Unique rows: [{len(check_set)}]')

UPD 11.11.2020: 感谢@ m9_psy提供提高性能提示。真的更快!当前,最快的方法是:

from io import BytesIO

check_set = {elem for elem in BytesIO(open('check.txt','rb').read())}

with open('base.txt','rb') as file_base:
    for line in file_base:
        if line in check_set:
            check_set.remove(line)

print(f'Unique rows: [{len(check_set)}]')

是否可以进一步加快此过程?

解决方法

我怀疑join操作需要进行n * m比较操作,其中nm是两个数据帧的长度。

此外,您的描述和代码之间也存在不一致之处:

  • “也就是说,您需要一个csv或txt文件,其行位于第二个文件中,而不在第一个文件中。” ⟶这表示在dv_check中,而不是dv_base
  • dv_check.join(dv_base,on='data',how='inner',inplace=True)⟶这意味着dv_checkdv_base

无论如何,一种想法是使用set,因为检查集合中的成员资格的时间复杂度为O(1),而检查列表中的成员资格的复杂度为O(n)。如果您熟悉SQL世界,这等效于从LOOP JOIN策略过渡到HASH JOIN策略:

# This will take care of removing the duplicates
base_set = set(dv_base['data'])
check_set = set(dv_check['data'])

# In `dv_check` but not `dv_base`
keys = check_set - base_set

# In both `dv_check` and `dv_base`
keys = check_set & base_set

这只会为您提供满足您条件的键。您仍然必须过滤两个数据框以获取其他属性。

在具有16GB RAM的2014年iMac上耗时1分14秒完成。

,

让我们生成一个数据集来模仿您的示例

  def Movezips(self,email):
        time.sleep(2)
        print("starting moving")
        source_dir = 'C:/Download'+email+'/'
        dst = 'E:/BackUpZip/'+email
        Path(dst).mkdir(parents=True,exist_ok=True)
        dst=dst+'/'
        files = glob.iglob(os.path.join(source_dir,"*.zip"))
        for file in files:
             if os.path.isfile(file):
                shutil.copy(file,dst)

        for file in files:
            if os.path.isfile(file):
                shutil.rmtree(file)

现在,要查找支票中不在基础中的行,我们可以将其合并,然后删除不匹配的行:

import vaex
import numpy as np
N = 50_000_000  # 50 million rows for base
N2 = 1_000_000  # 1 million for check
M = 40+1+45     # chars for each string
N_dup = 10_000  # number of duplicate rows in the checks

s1 = np.random.randint(ord('a'),ord('z'),(N,M),np.uint32).view(f'U{M}').reshape(N)
s2 = np.random.randint(ord('a'),(N2,np.uint32).view(f'U{M}').reshape(N2)
# make sure s2 has rows that match s1
dups = np.random.choice(N2,N_dup,replace=False)
s2[dups] = s1[np.random.choice(N,replace=False)]

# save the data to disk
vaex.from_arrays(s=s1).export('/data/tmp/base.hdf5')
vaex.from_arrays(s=s2).export('/data/tmp/check.hdf5')
,

这是另一种方法。检查文件约为0.1 GB(适合内存)。基本文件最大为100 GB(因此一次处理一行)。

创建测试数据和生成器功能以导入数据

from io import StringIO

# test data for base (>50 million lines)
base_file = '''a
b
c
d
e
'''

# test data for check (1 million lines)
check_file = '''d
e
f
g
'''

def read_lines(filename):
    ''' Read data file one line at a time (with generator function).'''
    handle = StringIO(filename)
    for line in handle:
        yield line.rstrip('\n')

仅在检查文件中查找元素(在@CodeDifferent示例中为check_set - base_set

check_set = {elem for elem in read_lines(check_file)}

for elem in read_lines(base_file):
    if elem in check_set:
        check_set.remove(elem)
print(check_set)
{'g','f'}

查找交集(在@CodeDifferent的示例中为check_set & base_set

check_set = {elem for elem in read_lines(check_file)}

common_elements = set()
for elem in read_lines(base_file):
    if elem in check_set:
        common_elements.add(elem)
print(common_elements)
{'d','e'}

我认为,当(a)基本文件比检查文件大得多,并且(b)基本文件对于内存中的数据结构太大时,这种方法最有效。

,

注意!我的原始答案是错误的。 @codedifferent是正确的。这是我略有不同的版本。这可能对某人有帮助。我认为该文本文件仅包含一列。

import pandas as pd

filepath_check = './data/checkfile.csv'
filepath_base = './data/basefile.csv'

# load the small data into memory
dfcheck = pd.read_csv(filepath_check)
dfcheck = set(dfcheck['data'])

# but load the big data in chunk
chunk_iter = pd.read_csv(filepath_base,chunksize=100000)

# for each chunk,remove intersect if any.
for chunk in chunk_iter:
    dfcheck = dfcheck - set(chunk['data'])
    print(len(dfcheck))

# write result
with open('./results.txt','w') as filehandler:
    for item in dfcheck:
        filehandler.write('%s\n'% item)

旧答案

我现在也遇到了类似的问题。我的解决方案是使用Dask,但是可以肯定的是Vaex应该没问题。

import dask.dataframe as dd

base_file = dd.read_csv('./base_file.csv')
check_file = dd.read_csv('./check_file.csv')

base_file = base_file.set_index('data')
check_file = check_file.set_index('data')

base_file.to_parquet('./results/base_file',compression=None)
check_file.to_parquet('./results/base_file',compression=None)

base_file.read_parquet('./results/base_file')
check_file.read_parquet('./results/check_file')
merged = dd.merge(base_file,check_file,left_index=True,right_index=True)

# save to csv from dask dataframe
merged.to_csv('/results/dask_result.csv',single_file = True)

# or save to csv from pandas dataframe
pandas_merged = merged.compute() # convert to pandas
pandas_merged.to_csv(...)
  1. 为什么要使用set_index?这使加入过程更快。 https://docs.dask.org/en/latest/dataframe-best-practices.html#joins
  2. 为什么要保存到镶木地板上以供日后阅读?在我的经验中, 即使使用Dask,直接读取CSV也会占用更多内存。加载实木复合地板文件肯定更快。 https://docs.dask.org/en/latest/dataframe-best-practices.html#store-data-in-apache-parquet-format。在进行了诸如join和set_index之类的详尽工作之后,我有很多保存/加载行来保存结果。
  3. 如果check_file足够小,则可以在加载文件后或必要时使用check_file = check_file.persist()将整个check_file加载到内存中。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...