问题描述
我需要加快处理涉及相当大数据集的任务的处理时间,这些数据集从包含CSV数据的最大1.5GB大型pickle文件中加载。我从python的多处理开始,但是对于难以挑剔的类对象,我不得不切换到悲痛。我运行了一些并行代码,并复制了从通常的串行运行中获得的结果:到目前为止,一切都很好。但是处理速度远不是中央主节点运行满油门时有用的任何东西,而实际子流程(有时总共有数百个) 串行而不是并行运行,由主节点又一次疯狂地运转着巨大的时间间隔-为什么呢? “最好”与Processpool一起使用,而apipe和amap相似,没有区别。
下面是我的代码摘录,首先是并行尝试,其次是串行部分。两者都给出相同的结果,但是并行方法要慢得多。重要的是,每个并行子进程使用的时间与串行循环中使用的时间大致相同。所有变量都在较长的处理管道中提前加载了。
#### ParaLLEL multiprocessing
if me.MP:
import pathos
cpuN = pathos.multiprocessing.cpu_count() - 1
pool = pathos.pools.Processpool( cpuN) # ThreadPool ParallelPool
argsIn1 = [] # a mid-large complex dictionary
argsIn2 = [] # a very large complex dictionary (CSV pickle of 400MB or more)
argsIn3 = [] # a list of strings
argsIn4 = [] # a brief string
for Uscr in UID_lst:
argsIn1.append( me)
argsIn2.append( Db)
argsIn3.append( UID_lst)
argsIn4.append( Uscr)
result_pool = pool.amap( functionX,argsIn1,argsIn2,argsIn3,argsIn4)
results = result_pool.get()
for result in results:
[ meTU,U] = result
me.v[T][U] = meTU[U] # insert result !
#### SERIAL processing
else:
for Uscr in UID_lst:
meTU,U = functionX( me,Db,UID_lst,Uscr)
me.v[T][U] = meTU[U] # insert result !
我在两台Linux机器上测试了此代码,这是i3 cpu(具有32GB RAM,slackware 14.2,python 3.7)和2 * Xeon盒子(具有64GB RAM,slackware current,python 3.8)。在pip3中安装了pathos 0.2.6。如前所述,两台机器在此处显示的代码都显示出相同的速度问题。
我在这里想念什么?
ADDENDUM:似乎只有第一个PID通过UID_lst中的所有项来完成整个工作-而其他10个子进程则处于空闲状态,什么也没有等待,如top和os.getpid()所示。在此示例中,cpuN为11。
附录2:对这个新修订感到抱歉,但是在不同的负载下运行此代码(要解决许多工作)最终涉及的不仅仅是一个子进程忙,而是经过了很长时间。这是最上面的输出:
top - 14:09:28 up 19 days,4:04,3 users,load average: 6.75,6.20,5.08
Tasks: 243 total,6 running,236 sleeping,0 stopped,1 zombie
%cpu(s): 48.8 us,1.2 sy,0.0 ni,49.9 id,0.0 wa,0.0 hi,0.0 si,0.0 st
MiB Mem : 64061.6 total,2873.6 free,33490.9 used,27697.1 buff/cache
MiB Swap: 0.0 total,0.0 free,0.0 used. 29752.0 avail Mem
PID USER PR NI VIRT RES SHR S %cpu %MEM TIME+ COMMAND
5441 userx 20 0 6597672 4.9g 63372 S 100.3 7.9 40:29.08 python3 do_Db_job
5475 userx 20 0 6252176 4.7g 8828 R 100.0 7.5 9:24.46 python3 do_Db_job
5473 userx 20 0 6260616 4.7g 8828 R 100.0 7.6 17:02.44 python3 do_Db_job
5476 userx 20 0 6252432 4.7g 8828 R 100.0 7.5 5:37.52 python3 do_Db_job
5477 userx 20 0 6252432 4.7g 8812 R 100.0 7.5 1:48.18 python3 do_Db_job
5474 userx 20 0 6253008 4.7g 8828 R 99.7 7.5 13:13.38 python3 do_Db_job
1353 userx 20 0 9412 4128 3376 S 0.0 0.0 0:59.63 sshd: userx@pts/0
1354 userx 20 0 7960 4692 3360 S 0.0 0.0 0:00.20 -bash
1369 userx 20 0 9780 4212 3040 S 0.0 0.0 31:16.80 sshd: userx@pts/1
1370 userx 20 0 7940 4632 3324 S 0.0 0.0 0:00.16 -bash
4545 userx 20 0 5016 3364 2296 R 0.0 0.0 3:01.76 top
5437 userx 20 0 19920 13280 6544 S 0.0 0.0 0:00.07 python3
5467 userx 20 0 0 0 0 Z 0.0 0.0 0:00.00 [git] <defunct>
5468 userx 20 0 3911460 2.5g 9148 S 0.0 4.0 17:48.90 python3 do_Db_job
5469 userx 20 0 3904568 2.5g 9148 S 0.0 4.0 16:13.19 python3 do_Db_job
5470 userx 20 0 3905408 2.5g 9148 S 0.0 4.0 16:34.32 python3 do_Db_job
5471 userx 20 0 3865764 2.4g 9148 S 0.0 3.9 18:35.39 python3 do_Db_job
5472 userx 20 0 3872140 2.5g 9148 S 0.0 3.9 20:43.44 python3 do_Db_job
5478 userx 20 0 3844492 2.4g 4252 S 0.0 3.9 0:00.00 python3 do_Db_job
27052 userx 20 0 9412 3784 3052 S 0.0 0.0 0:00.02 sshd: userx@pts/2
27054 userx 20 0 7932 4520 3224 S 0.0 0.0 0:00.01 -bash
在我看来,最多有6个子进程将在任何给定时间运行,这可能对应于psutil.cpu_count(logical = False)= 6,而不是pathos.multiprocessing.cpu_count()= 12 ... ?
解决方法
实际上,问题已解决-事实证明,在我的代码开发阶段,从来没有把它放在第一位。问题出在其他地方:提供给工作者进程的变量非常大,有时为数GB。即使在新的双至强机器上,这种情况也将使主/中央节点永远充满忙碌的莳萝/残渣(例如泡菜/解开酱),更不用说旧的i3 CPU盒了。在使用前者的情况下,我看到多达6或7个工人(在11个工人中)处于活动状态,而在后者中,甚至从来没有超过1个在役工人,即使对于前者,它也要花费大量时间,数十分钟才能看到一些工人聚集在顶部。
因此,我将需要调整代码,以便每个工作人员都必须从磁盘/网络中重新读取巨大的变量-这也需要一些时间,但是将中央节点从此愚蠢的重复任务中解放出来是有意义的,但是而是给它一个机会来完成其设计的工作,即计划和组织工人的演出。
我也很高兴地说,与传统的串行版本相比,并行运行的结果输出为CSV文件(wc:36722 1133356 90870757)是相同的。
话虽如此,我真的很惊讶使用python / pathos多么方便-不必在串行和并行运行之间更改相关的工作程序代码!