一旦多处理在 Python 中结束,如何合并多个文件?

问题描述

在我的代码中,multiprocessing Process 用于同时生成多个 impdp 作业(导入),并且每个作业生成一个具有动态名称的日志文件

'/DP_IMP_' + DP_PDB_FULL_NAME[i] + '' + DP_WORKLOAD + '' + str(vardate) + '.log'

vardate = datetime.Now().strftime("%d-%b-%Y-%I_%M_%s_%p")
tempfiles = []
for i in range((len(DP_PDB_FULL_NAME))):
        for DP_WORKLOAD in DP_WORKLOAD_NAME:
                 tempfiles.append(logdir + '/DP_IMP_' + DP_PDB_FULL_NAME[i] + '_' + DP_WORKLOAD  +  '_' + str(vardate) + '.log')
                 p1 = multiprocessing.Process(target=imp_workload,args=(DP_WORKLOAD,DP_DURATION_SECONDS,vardate,))
                 p1.start()

一旦所有进程都结束,我想将所有创建的日志文件合并到一个大的主日志文件中。但是,当我尝试在 (for i in range((len(DP_PDB_FULL_NAME))) 循环下使用类似的东西时:

with open('DATAPUMP_IMP_' + str(vardate) + '.log','wb') as wfd:
    for f in tempfiles:
        with open(f,'rb') as fd:
            shutil.copyfileobj(fd,wfd)

然后它会尝试在进程结束之前写入文件

此处,DP_PDB_FULL_NAME 是多个数据库的列表,因此多个进程在多个数据库中同时生成。当我尝试在循环结束后添加 p1.join() 时,多个数据库中不会发生多处理。

那么,一旦所有单独的进程都完成了,我应该如何创建一个主日志文件

解决方法

您应该创建某种结构来存储所需的变量和进程句柄。在该循环之后用 join 阻塞,直到所有子进程完成,然后处理结果文件。

handles = []
for i in range(10):
    p = Process()
    p.start()
    handles.append(p)

for handle in handles:
    handle.join()

,

所以,我在第一个循环结束后添加了 p1.join(),现在它可以工作了!

vardate = datetime.now().strftime("%d-%b-%Y-%I_%M_%S_%p")
tempfiles = []
for i in range((len(DP_PDB_FULL_NAME))):
        for DP_WORKLOAD in DP_WORKLOAD_NAME:
                 tempfiles.append(logdir + '/DP_IMP_' + DP_PDB_FULL_NAME[i] + '_' + DP_WORKLOAD  +  '_' + str(vardate) + '.log')
                 p1 = multiprocessing.Process(target=imp_workload,args=(DP_WORKLOAD,DP_DURATION_SECONDS,vardate,))
                 p1.start()
p1.join()

with open('DATAPUMP_IMP_' + str(vardate) + '.log','wb') as wfd:
    for f in tempfiles:
        with open(f,'rb') as fd:

进一步解释一下,在上述场景中添加连接有三种情况,多处理相应地工作。

  1. 在最里面的 for 循环中:

    因此,如果在此处添加 join,则多处理根本无法工作,因为它就在 proc.start() 之后

for i in range((len(DP_PDB_FULL_NAME))):
        for DP_WORKLOAD in DP_WORKLOAD_NAME:
                 tempfiles.append(logdir + '/DP_IMP_' + str(vardate) + '.log')
                 p1 = multiprocessing.Process(target=imp_workload,))
                 p1.start()
                 p1.join()
  1. 外层 for 循环内(最内层 for 循环外)

    在这里,多处理仅适用于内部循环,而不适用于多个数据库

for i in range((len(DP_PDB_FULL_NAME))):
        for DP_WORKLOAD in DP_WORKLOAD_NAME:
                 tempfiles.append(logdir + '/DP_IMP_' + str(vardate) + '.log')
                 p1 = multiprocessing.Process(target=imp_workload,))
                 p1.start()
        p1.join()
  1. 在外部 for 循环之外

    这是正确的解决方案(如上所述),它位于使用多处理的所有循环之外。