问题描述
考虑以下简单的自包含python3程序:
import asyncio
import multiprocessing
import time
loop = asyncio.get_event_loop()
def sub_process(event,i):
async def async_function():
await loop.run_in_executor(None,event.wait) # <-- problematic line
print(f"successfully awaited on process {i}!") # <-- not all processes reach this line!
loop.run_until_complete(async_function())
if __name__ == '__main__':
event = multiprocessing.Event()
processes = [multiprocessing.Process(target=sub_process,args=(event,i)) for i in range(multiprocessing.cpu_count())]
for process in processes:
process.start()
time.sleep(2)
event.set()
for process in processes:
process.join()
print("success.")
我遇到了不确定性故障,其中某些(但不是全部)过程无法通过“有问题的”行,即,即使在设置事件之后也无法进行。未能解除阻止的进程数量也是不确定的(通常在我的16vcpu AWS实例上为12-16之间)。我仅能够在Linux机器(而非MacOS)上重现此内容。
与asyncio
相比,multiprocessing
的问题似乎更多。我可以这样说是因为,如果我用包装了event.wait
的简单同步函数替换了event.wait()
,则同步函数确实继续经过event.wait()
调用,即: / p>
def sub_process(event,i):
def sync_function():
event.wait()
print("this line is reached...")
async def async_function():
await loop.run_in_executor(None,sync_function)
print("...but this line is not.")
loop.run_until_complete(async_function())
当恰好一个进程无法解除阻塞并且我通过调用ctrl + C终止时,我得到以下堆栈跟踪:
Process Process-16:
Traceback (most recent call last):
File "temp.py",line 21,in <module>
process.join()
File "/usr/lib/python3.8/multiprocessing/process.py",line 149,in join
res = self._popen.wait(timeout)
File "/usr/lib/python3.8/multiprocessing/popen_fork.py",line 47,in wait
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
File "/usr/lib/python3.8/multiprocessing/popen_fork.py",line 27,in poll
Traceback (most recent call last):
pid,sts = os.waitpid(self.pid,flag)
File "/usr/lib/python3.8/multiprocessing/process.py",line 315,in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py",line 108,in run
self._target(*self._args,**self._kwargs)
KeyboardInterrupt
File "temp.py",line 10,in sub_process
loop.run_until_complete(async_function())
File "/usr/lib/python3.8/asyncio/base_events.py",line 603,in run_until_complete
self.run_forever()
File "/usr/lib/python3.8/asyncio/base_events.py",line 570,in run_forever
self._run_once()
File "/usr/lib/python3.8/asyncio/base_events.py",line 1823,in _run_once
event_list = self._selector.select(timeout)
File "/usr/lib/python3.8/selectors.py",line 468,in select
fd_event_list = self._selector.poll(timeout,max_ev)
KeyboardInterrupt
任何见识将不胜感激!谢谢。
编辑:在multiprocessing.set_start_method('spawn')
下方添加if __name__ == '__main__':
似乎可以解决该问题;谢谢@ user4815162342。我仍然想更多地了解正在发生的事情。
解决方法
自我回答:在multiprocessing.set_start_method('spawn')
下方添加if __name__ == '__main__'
可解决此问题。感谢@ user4815162342指出这一点。
原因似乎与多线程和派生的一些底层不兼容有关;例如,“请注意,安全地分叉多线程进程是有问题的。” (来自multiprocessing docs)。请注意,loop.run_in_executor
在后台使用ThreadPoolExecutor
;这就是为什么该程序首先是多线程的(可能不会立即显现出来)。
似乎在https://bugs.python.org/issue33725上提供了更多详细信息。尽管这些细节对我来说完全不为人所知,但是MacOS似乎并没有为派生多线程进程提供良好的底层支持。
, TL; DR 将multiprocessing.set_start_method('spawn')
放在脚本顶部,以解决Linux上的问题。由于spawn
是那里的默认启动方法,因此无法在MacOS上重现该问题。
类似Unix的操作系统本身为并行提供fork primitive。对fork()
的调用有效地复制了当前进程并继续执行两者。 fork()
仍被广泛用于执行其他程序,其中对fork()
的调用后紧跟exec()
。如今,派生已很少用于并行性,在很大程度上由多线程取代,在多线程中,所有线程共享内存,并且不需要昂贵的进程间通信。但是,fork()
似乎是multiprocessing
的理想工具,它需要多个单独的进程运行同一可执行文件(Python解释器),这正是fork所提供的。分叉子进程在父进程离开的地方继续进行的事实也是一个好处,因为父进程中加载的所有Python模块都会自动出现在子进程中。
在类似Unix的系统上,multiprocessing
通过派生当前进程并继续运行侦听输入队列的代码来创建工作进程,以准备执行用户提供的作业。在不提供fork()
的Windows上,它使用不同的方法:它通过执行全新的python.exe
来创建工作程序,并使用命令行参数和环境变量来指示其引导至多处理工人。基于Spawn的方法的缺点是初始化工作池的速度明显慢,因为它需要池中每个工作人员使用一个全新的Python实例,这对于导入大型库的进程可能会花费很多时间。它也需要更多的内存,因为与fork不同,fork部署copy-on-write来保存在重复的内存页上,运行新的可执行文件在父级和子级之间几乎没有任何共享。另一方面,产卵的好处是每个工人都从一块完全干净的石板开始,这至关重要。
碰巧,分叉与线程的交互非常糟糕,因为fork()
仅复制了调用fork()
的线程。任何创建线程的代码都不会在子级中找到它,但是其数据结构将指示该线程已成功创建。即使您编写的代码不是多线程的,这也可能会咬住您-使用创建帮助程序线程的库作为实现细节就足够了。分叉还与互斥锁交互:在不相关的线程持有互斥锁的情况下,可能会发生对fork()
的调用。当子进程调用需要互斥量的代码并尝试获取该互斥量时,它将死锁。 POSIX试图提供解决mitigate这些问题的方法,但是这些变通办法极为困难,而且经常无法完全正确地使用。苹果提供的MacOS系统库甚至没有尝试,因此Python开发人员gave up刚刚在MacOS上产生了默认的多处理工作程序启动方法。从Python 3.4开始,multiprocessing.set_start_method
可用于在任何操作系统上请求基于生成的工作程序创建方法。
除了与线程的交互之外,asyncio还为fork提供了自己的挑战。异步事件循环使用线程池来阻止内部使用的操作,这些操作可能在子代中被破坏。它使用管道来实现call_soon_threadsafe
(以及扩展名run_in_executor
,它使用相同的机制在完成工作时向事件循环发出警报),以及异步安全信号处理程序。这些管道是由子进程继承的,因此其对管道的写入最终可能会被父进程甚至同级拾取。在派生的孩子中尝试使用相同事件循环几乎肯定是一个错误。可能的工作是创建一个新的事件循环,但奇怪的是,这种用法从未经过开发人员的系统测试,并且事实上不受支持。
如果要在工作程序中使用asyncio,则建议切换到spawn
或forkserver
方法来创建多处理工作程序。