问题描述
多年后,我终于有了一个解决方案(在处理不相关的问题时发现)。我已经在Python 3.7、3.8和3.9上进行了测试。
临时 。您只需在调用周围执行此操作process.start()
。sys.warnoptions
被记录为您不应手动修改的实现细节;的官方建议是在使用的功能warnings
模块,并设置PYTHONWARNINGS
在os.environ
。这行不通。似乎唯一起作用的是打补丁sys.warnoptions
。在测试中,您可以执行以下操作:
import multiprocessing
from unittest.mock import patch
p = multiprocessing.Process(target=my_function)
with patch('sys.warnoptions', []):
p.start()
p.join()
如果您不想使用unittest.mock
,请手动打补丁:
import multiprocessing
import sys
p = multiprocessing.Process(target=my_function)
old_warnoptions = sys.warnoptions
try:
sys.warnoptions = []
p.start()
finally:
sys.warnoptions = old_warnoptions
p.join()
解决方法
太长; 没看
该warnings.catch_warnings()
上下文管理是不是线程安全的。如何在并行处理环境中使用它?
背景
以下代码使用Pythonmultiprocessing
模块的并行处理解决了最大化问题。它获取一个(不可变的)小部件列表,将它们进行分区,找到所有分区的最大值(“finalists”),然后找到最大值(“ champion”
)的“决赛选手”。如果我正确理解了自己的代码(如果我理解了我的代码,那么我就不会在这里),我将与所有子进程共享内存以为它们提供输入小部件,并multiprocessing
使用操作系统级管道和酸洗来发送当工作人员完成后,决赛选手的小部件将返回到主要流程。
问题的根源
我想捕捉在小 部件从进程间管道中脱出 后 发生的小 部件重新实例化后,由小部件的重新实例化引起的冗余小部件警告
。实例化窗口小部件对象时,它们将验证自己的数据,并从Python标准warnings
模块发出警告,以告知应用程序用户该窗口小部件怀疑用户的输入数据存在问题。因为解开导致对象实例化,所以我对代码的理解意味着,每个小部件对象只有在退出管道后才是入围者,才被精确地实例化一次-
请参阅下一节以了解为什么这是不正确的。
这些小部件在被弄成碎片之前就已经创建了,因此用户已经很痛苦地意识到自己输入了错误的内容,并且不想再次听到。这些是我想通过warnings
模块的catch_warnings()
上下文管理器(即with
语句)捕获的警告。
解决方案失败
在我的测试中,我将范围缩小了何时发出多余的警告到我在下面标记为 A行 和 B 行
之间的任何位置。令我感到惊讶的是,警告声不在附近的其他地方发出output_queue.get()
。对我来说,这意味着要multiprocessing
使用酸洗将小部件发送给工人。
结果是,放置一个由warnings.catch_warnings()
甚至从 A行 到 B 行的
所有内容创建的上下文管理器,并在此上下文内设置正确的警告过滤器,都不会捕获警告。对我而言,这意味着警告是在工作进程中发出的。将此上下文管理器放在工作程序代码周围也不会捕获警告。
编码
此示例省略了代码,以决定问题的大小是否太小而无法打扰分叉过程,导入多处理程序以及定义my_frobnal_counter
和my_load_balancer
。
"Call `frobnicate(list_of_widgets)` to get the widget with the most frobnals"
def frobnicate_parallel_worker(widgets,output_queue):
resultant_widget = max(widgets,key=my_frobnal_counter)
output_queue.put(resultant_widget)
def frobnicate_parallel(widgets):
output_queue = multiprocessing.Queue()
# partitions: Generator yielding tuples of sets
partitions = my_load_balancer(widgets)
processes = []
# Line A: Possible start of where the warnings are coming from.
for partition in partitions:
p = multiprocessing.Process(
target=frobnicate_parallel_worker,args=(partition,output_queue))
processes.append(p)
p.start()
finalists = []
for p in processes:
finalists.append(output_queue.get())
# Avoid deadlocks in Unix by draining queue before joining processes
for p in processes:
p.join()
# Line B: Warnings no longer possible after here.
return max(finalists,key=my_frobnal_counter)