python多处理池并不总是使用所有工作者

问题:
当将1000个任务发送到apply_async时,它们在所有48个CPU上并行运行,但有时运行的CPU越来越少,直到只剩下一个CPU运行,并且只有当最后一个CPU完成其任务时,所有CPU才会继续运行每个人都有一个新任务.它不应该等待像这样的任何“任务批处理”..

我的(简化)代码:

from multiprocessing import Pool
pool = Pool(47)
tasks = [pool.apply_async(json2features,(j,)) for j in jsons]
feats = [t.get() for t in tasks]

jsons = […]是已加载到内存并解析为对象的大约1000个JSON的列表.
json2features(json)在json上执行一些CPU繁重的工作,并返回一个数字数组.
此功能可能需要1秒到15分钟才能运行,因此我使用启发式方法对jsons进行排序.希望最长的任务首先在列表中,因此首先开始.

json2features函数还会在任务完成时以及花费的时间内打印.它全部运行在一个拥有48个核心的ubuntu服务器上,就像我上面所说的那样,使用全部47个核心,它开始很棒.然后,当任务完成时,运行的核心越来越少,这听起来完全没问题,不是因为在最后一个核心完成之后(当我看到它打印到stdout时),所有CPU都开始在新任务上再次运行,这意味着这不是真正的清单结束.它可能会再次执行相同的操作,然后再次执行列表的实际结束.

有时它可以在5分钟内只使用一个核心,当任务最终完成时,它会在新任务上再次开始使用所有核心. (所以它不会停留在某些IPC开销上)

没有重复的jsons,也没有任何依赖关系(它们都是静态的,新鲜的磁盘数据,没有引用等等),也没有json2features调用之间的任何依赖关系(没有全局状态或任何东西),除了它们使用相同的终端他们的印刷品.

我怀疑问题是工作人员在调用get结果之前不会被释放,所以我尝试了以下代码:

from multiprocessing import Pool
pool = Pool(47)
tasks = [pool.apply_async(print,(i,)) for i in range(1000)]
# feats = [t.get() for t in tasks]

并且它会打印所有1000个数字,即使没有调用get.

我现在已经没想到问题可能是什么了.
这真的是Pool的正常行为吗?

非常感谢!

最佳答案
multiprocessing.Pool依赖于单个os.pipe将任务交付给worker.

通常在Unix上,默认管道大小范围为4到64 Kb.如果您提供的JSON大小很大,您可能会在任何给定的时间点堵塞管道.

这意味着,当其中一名工人忙于从管道中读取大型JSON时,所有其他工作人员都会饿死.

通过IPC共享大数据通常是一种不好的做法,因为它会导致性能不佳.这在multiprocessing programming guidelines中甚至得到了强调.

Avoid shared state

As far as possible one should try to avoid shifting large amounts of data between processes.

不要在主进程中读取JSON文件,只需向工作人员发送文件名,然后让他们打开并阅读内容.您肯定会注意到性能的提高,因为您也在并发域中移动JSON加载阶段.

请注意,结果也是如此.单个os.pipe也用于将结果返回到主进程.如果一个或多个工作人员阻塞了结果管道,那么您将获得等待主管道排除它的所有进程.应将大结果写入文件.然后,您可以利用主进程上的多线程快速回读文件中的结果.

相关文章

Python中的函数(二) 在上一篇文章中提到了Python中函数的定...
Python中的字符串 可能大多数人在学习C语言的时候,最先接触...
Python 面向对象编程(一) 虽然Python是解释性语言,但是它...
Python面向对象编程(二) 在前面一篇文章中谈到了类的基本定...
Python中的函数(一) 接触过C语言的朋友对函数这个词肯定非...
在windows下如何快速搭建web.py开发框架 用Python进行web开发...