如何从 Python 中的两对文件中获取枢轴线? 通用合并扫描和书写

问题描述

How to get the pivot lines from two tab-separated files? 开始,有一种使用 unix 命令从两个文件中透视行的快速方法

如果我们有两对文件

  • f1af1b
  • f2af2b

目标是提供一个 3 列制表符分隔文件,其中包括

  • f1a / f2a
  • f1b
  • f2b

其中 f1a / f2a文件中同时出现在 f1af1b 中的行:

我尝试了以下可行的方法,但如果文件非常大,则存储 f1f2 字典将需要大量内存。例如。数十亿行的文件

import sys
from tqdm import tqdm 

f1a,f1b,f2a,f2b = sys.argv[1],sys.argv[2],sys.argv[3],sys.argv[4]


# Read first pair of file into memory.
with open(f1a) as fin_f1a,open(f1a) as fin_f1b:
  f1 = {s.strip().replace('\t',' ') :t.strip().replace('\t',' ') for s,t in tqdm(zip(fin_f1a,fin_f1b))}

with open(s2) as fin_f2a,open(t2) as fin_f2b:
  f2 = {s.strip().replace('\t',t in tqdm(zip(fin_f2a,fin_f2b))}


with open('piVoted.tsv','w') as fout:
  for s in tqdm(f1.keys() & f2.keys()):
    print('\t'.join([s,f1[s],f2[s]]),end='\n',file=fout)

是否有更快/更好/更简单的方法来在 Python 中实现相同的 3 列制表符分隔文件是否有可以对大文件有效执行此类操作的库?


使用 turicreate.SFrame,我还可以:

from turicreate import SFrame

f1a,sys.argv[4]

sf1a = SFrame.read_csv(f1a,delimited='\0',header=False)
sf1b = SFrame.read_csv(f1b,header=False) 

sf2a = SFrame.read_csv(f2a,header=False)
sf2b = SFrame.read_csv(f2b,header=False)

sf1 = sf1a.join(sf1b) 
sf2 = sf2a.join(sf2b)

sf = sf1.join(sf2,on='X1',how='left') 
sf.save('piVoted')

解决方法

通用合并

zip 函数不会存储可迭代对象的整个副本。所以我们可以安全地使用它。

假设您有两个按第一列升序排序的可迭代对象,您可以按如下方式连接两个表。

def merge(t1,t2):
    end = object()
    end_ = end,None
    a1,b1 = next(t1,end_)
    a2,b2 = next(t2,end_)
    while a1 is not end and a2 is not end:
        if a1 < a2:
            a1,end_)
        elif a1 > a2:
            a2,end_)
        else:
            yield a1,b1,b2
            a1,end_)
            a2,end_)

使用两个迭代器调用合并并生成第三个迭代器,并且每次只需要存储每个迭代器的一个元素。

list(merge(iter([(0,1),(1,(3,2)]),iter([(0,'a'),'b'),(2,'c'),'d'),(4,'e')])))
[(0,1,2,'d')]

扫描和书写

为了防止整个文件被存储,我有一个扫描方法,它会读取每个文件一次yield一行。

def scan(fa,fb):
    for a,b in zip(fa,fb):
        a = a.strip().replace('\t',' ')
        b = b.strip().replace('\t',' ')
        yield a,b
def scan_by_name(fa,fb):
    with open(fa) as fha,open(fb) as fhb:
        yield from scan(fha,fhb)

然后你可以用这种方式解决你的问题(未经测试,我没有你的文件)

with open('pivoted.tsv','w') as fout:
    t1 = scan_by_name(f1a,f1b)
    t2 = scan_by_name(f2a,f2b)
    for row in merge(t1,t2):
        print('\t'.join(row),end='\n',file=fout)
,

正如 leangaurav 所建议的,可以使用 Dask 来完成。

优点是我们可以制作一个在线程(或进程)池中运行并读取文件块(使用很少的 RAM)而不必担心这一点的解决方案。

例如,我们创建一些测试数据:

from string import ascii_lowercase
from random import choices

def rand_str(k=3):
    return " ".join("".join(choices(ascii_lowercase,k=k)) for _ in range(2))

N = 2_000

for file_name in ["example_a.txt","example_b.txt"]:
    with open(file_name,"w") as f:
        for _ in range(N):
            line = f"{rand_str()} \t {rand_str()}\n"
            f.write(line)

然后我们用 dask 读取数据,我们指出哪一列将成为索引并进行合并:

from dask import compute
import dask.dataframe as dd

# this does not process anything yet 
df_a = dd.read_csv("example_a.txt",sep="\t",names=["pivot","data"]).set_index("pivot")
df_b = dd.read_csv("example_b.txt","data"]).set_index("pivot")

# this is the heavy part
result = dask.compute(dd.merge(df_a,df_b,left_index=True,right_index=True))
# save the output
result[0].to_csv("out.txt",header=False)

在旧笔记本上针对不同 N 的一些测试(仅考虑计算步骤):

  • N = 500_000 -> 11 秒
  • N = 1_000_000 -> 25 秒
  • N = 2_000_000 -> 44 秒
  • N = 4_000_000 -> 1m33s
,

将立即评估对字典的理解。

如果您不喜欢流式传输,请尝试对数据进行分段。

def oneSegment(first_letter)
# Read first pair of file into memory.
with open(f1a) as fin_f1a,open(f1a) as fin_f1b:
f1 = {s.strip().replace('\t',' ') :t.strip().replace('\t',' ') for s,t in tqdm(zip(fin_f1a,fin_f1b)) if s.strip().replace('\t',' ').startwith(first_letter)}

with open(s2) as fin_f2a,open(t2) as fin_f2b:
f2 = {s.strip().replace('\t',t in tqdm(zip(fin_f2a,fin_f2b)) if s.strip().replace('\t',' ').startwith(first_letter)}

with open('pivoted.tsv','a') as fout:
for s in tqdm(f1.keys() & f2.keys()):
    print('\t'.join([s,f1[s],f2[s]]),file=fout)
oneSegment("a")
,

此解决方案假设您在 linux 上运行 Python 代码,您可以通过 os.system 运行一系列 linux 命令来实现此目的。某些步骤可以合并为一个命令,也可以作为单独的进程并行运行以提高速度。
同一组中的每个任务可以并行运行。 (1,3,4),(5,6),(7,8),(9,10,11)

import os

f1a = "f1a"
f1b = "f1b"
f2a = "f2a"
f2b = "f2b"

# replace \t with space
os.system(f"sed -i 's/\t/ /g' {f1a}") #1
os.system(f"sed -i 's/\t/ /g' {f1b}") #2
os.system(f"sed -i 's/\t/ /g' {f2a}") #3
os.system(f"sed -i 's/\t/ /g' {f2b}") #4

# join the pair of files with \t
os.system(f"paste {f1a} {f1b} > f1_t") #5 join f1a and f1b with \t delimiter
os.system(f"paste {f2a} {f2b} > f2_t") #6 join f1a and f1b with \t delimiter

# prepare data for join: sort the files
os.system(f"sort f1_t > f1") #7 
os.system(f"sort f2_t > f2") #8 

os.system(f"join f1 f2 -j 1 -t '\t'> f12") #9 lines common to both f1,f2: -j 1
os.system(f"join f1 f2 -v 1 -j 1 -t '\t'> f11") #10 lines present only in f1 : -v 1
os.system(f"join f1 f2 -v 2 -j 1 -t '\t'> f22") #11 lines present only in f2 : -v 2

os.system(f"cat f12  f11 f22 > f") #12 join into final result f

由于问题提到的是 python 而不是平台,对于平台中立方法,请查看 dask

这个answer解决了一部分问题,就是合并大文件。但是加入文件集等仍然需要弄清楚,Dask也应该可以做到。或者预处理可以通过上面的代码(甚至代码之外)完成,合并可以用 Dask 完成。另请检查 this 答案。

在处理大数据块时,排序使事情变得更好。
this dask 文档中查看有关 indexes 的详细信息。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...