问题描述
在我的代码中,multiprocessing Process 用于同时生成多个 impdp 作业(导入),并且每个作业生成一个具有动态名称的日志文件:
'/DP_IMP_' + DP_PDB_FULL_NAME[i] + '' + DP_WORKLOAD + '' + str(vardate) + '.log'
vardate = datetime.Now().strftime("%d-%b-%Y-%I_%M_%s_%p")
tempfiles = []
for i in range((len(DP_PDB_FULL_NAME))):
for DP_WORKLOAD in DP_WORKLOAD_NAME:
tempfiles.append(logdir + '/DP_IMP_' + DP_PDB_FULL_NAME[i] + '_' + DP_WORKLOAD + '_' + str(vardate) + '.log')
p1 = multiprocessing.Process(target=imp_workload,args=(DP_WORKLOAD,DP_DURATION_SECONDS,vardate,))
p1.start()
一旦所有进程都结束,我想将所有创建的日志文件合并到一个大的主日志文件中。但是,当我尝试在 (for i in range((len(DP_PDB_FULL_NAME))) 循环下使用类似的东西时:
with open('DATAPUMP_IMP_' + str(vardate) + '.log','wb') as wfd:
for f in tempfiles:
with open(f,'rb') as fd:
shutil.copyfileobj(fd,wfd)
然后它会尝试在进程结束之前写入文件。
此处,DP_PDB_FULL_NAME 是多个数据库的列表,因此多个进程在多个数据库中同时生成。当我尝试在循环结束后添加 p1.join()
时,多个数据库中不会发生多处理。
那么,一旦所有单独的进程都完成了,我应该如何创建一个主日志文件?
解决方法
您应该创建某种结构来存储所需的变量和进程句柄。在该循环之后用 join 阻塞,直到所有子进程完成,然后处理结果文件。
handles = []
for i in range(10):
p = Process()
p.start()
handles.append(p)
for handle in handles:
handle.join()
,
所以,我在第一个循环结束后添加了 p1.join()
,现在它可以工作了!
vardate = datetime.now().strftime("%d-%b-%Y-%I_%M_%S_%p")
tempfiles = []
for i in range((len(DP_PDB_FULL_NAME))):
for DP_WORKLOAD in DP_WORKLOAD_NAME:
tempfiles.append(logdir + '/DP_IMP_' + DP_PDB_FULL_NAME[i] + '_' + DP_WORKLOAD + '_' + str(vardate) + '.log')
p1 = multiprocessing.Process(target=imp_workload,args=(DP_WORKLOAD,DP_DURATION_SECONDS,vardate,))
p1.start()
p1.join()
with open('DATAPUMP_IMP_' + str(vardate) + '.log','wb') as wfd:
for f in tempfiles:
with open(f,'rb') as fd:
进一步解释一下,在上述场景中添加连接有三种情况,多处理相应地工作。
-
在最里面的 for 循环中:
因此,如果在此处添加 join,则多处理根本无法工作,因为它就在 proc.start() 之后
for i in range((len(DP_PDB_FULL_NAME))):
for DP_WORKLOAD in DP_WORKLOAD_NAME:
tempfiles.append(logdir + '/DP_IMP_' + str(vardate) + '.log')
p1 = multiprocessing.Process(target=imp_workload,))
p1.start()
p1.join()
-
外层 for 循环内(最内层 for 循环外)
在这里,多处理仅适用于内部循环,而不适用于多个数据库
for i in range((len(DP_PDB_FULL_NAME))):
for DP_WORKLOAD in DP_WORKLOAD_NAME:
tempfiles.append(logdir + '/DP_IMP_' + str(vardate) + '.log')
p1 = multiprocessing.Process(target=imp_workload,))
p1.start()
p1.join()
-
在外部 for 循环之外
这是正确的解决方案(如上所述),它位于使用多处理的所有循环之外。