await run_in_executor在同步功能终止时无法解除阻止

问题描述

考虑以下简单的自包含python3程序:

import asyncio
import multiprocessing
import time
loop = asyncio.get_event_loop()

def sub_process(event,i):
    async def async_function():
        await loop.run_in_executor(None,event.wait)  # <-- problematic line
        print(f"successfully awaited on process {i}!")  # <-- not all processes reach this line!
    loop.run_until_complete(async_function())


if __name__ == '__main__':
    event = multiprocessing.Event()
    processes = [multiprocessing.Process(target=sub_process,args=(event,i)) for i in range(multiprocessing.cpu_count())]
    for process in processes:
        process.start()
    time.sleep(2)
    event.set()
    for process in processes:
        process.join()
    print("success.")

我遇到了不确定性故障,其中某些(但不是全部)过程无法通过“有问题的”行,即,即使在设置事件之后也无法进行。未能解除阻止的进程数量也是不确定的(通常在我的16vcpu AWS实例上为12-16之间)。我能够在Linux机器(而非MacOS)上重现此内容

asyncio相比,multiprocessing的问题似乎更多。我可以这样说是因为,如果我用包装了event.wait的简单同步函数替换event.wait(),则同步函数确实继续经过event.wait()调用,即: / p>

def sub_process(event,i):
    def sync_function():
        event.wait()
        print("this line is reached...")
    async def async_function():
        await loop.run_in_executor(None,sync_function)
        print("...but this line is not.")
    loop.run_until_complete(async_function())

当恰好一个进程无法解除阻塞并且我通过调用ctrl + C终止时,我得到以下堆栈跟踪:

Process Process-16:
Traceback (most recent call last):
  File "temp.py",line 21,in <module>
    process.join()
  File "/usr/lib/python3.8/multiprocessing/process.py",line 149,in join
    res = self._popen.wait(timeout)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py",line 47,in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py",line 27,in poll
Traceback (most recent call last):
    pid,sts = os.waitpid(self.pid,flag)
  File "/usr/lib/python3.8/multiprocessing/process.py",line 315,in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py",line 108,in run
    self._target(*self._args,**self._kwargs)
KeyboardInterrupt
  File "temp.py",line 10,in sub_process
    loop.run_until_complete(async_function())
  File "/usr/lib/python3.8/asyncio/base_events.py",line 603,in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.8/asyncio/base_events.py",line 570,in run_forever
    self._run_once()
  File "/usr/lib/python3.8/asyncio/base_events.py",line 1823,in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/lib/python3.8/selectors.py",line 468,in select
    fd_event_list = self._selector.poll(timeout,max_ev)
KeyboardInterrupt

任何见识将不胜感激!谢谢。

编辑:在multiprocessing.set_start_method('spawn')下方添加if __name__ == '__main__':似乎可以解决该问题;谢谢@ user4815162342。我仍然想更多地了解正在发生的事情。

解决方法

自我回答:在multiprocessing.set_start_method('spawn')下方添加if __name__ == '__main__'可解决此问题。感谢@ user4815162342指出这一点。

原因似乎与多线程和派生的一些底层不兼容有关;例如,“请注意,安全地分叉多线程进程是有问题的。” (来自multiprocessing docs)。请注意,loop.run_in_executor在后台使用ThreadPoolExecutor;这就是为什么该程序首先是多线程的(可能不会立即显现出来)。

似乎在https://bugs.python.org/issue33725上提供了更多详细信息。尽管这些细节对我来说完全不为人所知,但是MacOS似乎并没有为派生多线程进程提供良好的底层支持。

,

TL; DR multiprocessing.set_start_method('spawn')放在脚本顶部,以解决Linux上的问题。由于spawn是那里的默认启动方法,因此无法在MacOS上重现该问题。

类似Unix的操作系统本身为并行提供fork primitive。对fork()的调用有效地复制了当前进程并继续执行两者。 fork()仍被广泛用于执行其他程序,其中对fork()的调用后紧跟exec()。如今,派生已很少用于并行性,在很大程度上由多线程取代,在多线程中,所有线程共享内存,并且不需要昂贵的进程间通信。但是,fork()似乎是multiprocessing的理想工具,它需要多个单独的进程运行同一可执行文件(Python解释器),这正是fork所提供的。分叉子进程在父进程离开的地方继续进行的事实也是一个好处,因为父进程中加载​​的所有Python模块都会自动出现在子进程中。

在类似Unix的系统上,multiprocessing通过派生当前进程并继续运行侦听输入队列的代码来创建工作进程,以准备执行用户提供的作业。在不提供fork()的Windows上,它使用不同的方法:它通过执行全新的python.exe来创建工作程序,并使用命令行参数和环境变量来指示其引导至多处理工人。基于Spawn的方法的缺点是初始化工作池的速度明显慢,因为它需要池中每个工作人员使用一个全新的Python实例,这对于导入大型库的进程可能会花费很多时间。它也需要更多的内存,因为与fork不同,fork部署copy-on-write来保存在重复的内存页上,运行新的可执行文件在父级和子级之间几乎没有任何共享。另一方面,产卵的好处是每个工人都从一块完全干净的石板开始,这至关重要。

碰巧,分叉与线程的交互非常糟糕,因为fork()仅复制了调用fork()的线程。任何创建线程的代码都不会在子级中找到它,但是其数据结构将指示该线程已成功创建。即使您编写的代码不是多线程的,这也可能会咬住您-使用创建帮助程序线程的库作为实现细节就足够了。分叉还与互斥锁交互:在不相关的线程持有互斥锁的情况下,可能会发生对fork()的调用。当子进程调用需要互斥量的代码并尝试获取该互斥量时,它将死锁。 POSIX试图提供解决mitigate这些问题的方法,但是这些变通办法极为困难,而且经常无法完全正确地使用。苹果提供的MacOS系统库甚至没有尝试,因此Python开发人员gave up刚刚在MacOS上产生了默认的多处理工作程序启动方法。从Python 3.4开始,multiprocessing.set_start_method可用于在任何操作系统上请求基于生成的工作程序创建方法。

除了与线程的交互之外,asyncio还为fork提供了自己的挑战。异步事件循环使用线程池来阻止内部使用的操作,这些操作可能在子代中被破坏。它使用管道来实现call_soon_threadsafe(以及扩展名run_in_executor,它使用相同的机制在完成工作时向事件循环发出警报),以及异步安全信号处理程序。这些管道是由子进程继承的,因此其对管道的写入最终可能会被父进程甚至同级拾取。在派生的孩子中尝试使用相同事件循环几乎肯定是一个错误。可能的工作是创建一个新的事件循环,但奇怪的是,这种用法从未经过开发人员的系统测试,并且事实上不受支持。

如果要在工作程序中使用asyncio,则建议切换到spawnforkserver方法来创建多处理工作程序。