在多处理队列上使用 islice 时阻止脚本挂起

问题描述

我有一个可迭代的 Python 类,它包含一个多处理生成器。有些用例只需要生成一个子集,所以它被包裹在 islice 中。

但是,当使用 islice 时调用挂起,我猜是由于底层多处理进程没有意识到事情已经结束。

一个最低限度的功能示例如下:

from itertools import islice
import multiprocessing as mp

STOP_MSG = 'STOP!'

def generator(queue,max_val):
  for i in range(max_val):
    queue.put(i)
  queue.put(STOP_MSG)

class GeneratorMPProc:
  def __init__(self,max_val):
    self.max_val = max_val

  def __iter__(self):
    queue = mp.Queue()
    Feeder_process = mp.Process(
      target=generator,args=(
        queue,self.max_val,))
    Feeder_process.start()
    msg = queue.get()
    while msg != STOP_MSG:
      yield msg
      msg = queue.get()
    Feeder_process.join()

if __name__ == '__main__':
  max_val = 0xFFFFFFFFF
  end_val = 10

  psm = GeneratorMPProc(max_val)
  rsm = [i for i in islice(psm,end_val)]

如何解决这个问题,以便即使在使用 islice 或任何子集选择器时它也能正确终止?

解决方法

您的 isllice 调用在 GeneratorMPProc.iter 返回之前不会返回,并且 max_val 设置为 0xFFFFFFFFF(写入队列不是最快的操作,这也会消耗一些资源)。换句话说,“事情还没有结束”,直到您的生成器函数结束,因此您的 multiprocess.Process 实际上结束并可以加入。

max_val 设置为诸如 20 之类的值,您的程序将很容易终止。


if __name__ == '__main__':
    #max_val = 0xFFFFFFFFF
    max_val = 20
    end_val = 10

    psm = GeneratorMPProc(max_val)
    rsm = [i for i in islice(psm,end_val)]
    print(rsm)

打印:

[0,1,2,3,4,5,6,7,8,9]

更新

您可能需要考虑使用额外的参数 daemon=True 启动“生成器”进程,使其成为守护进程,然后从方法 feeder_process.join() 中完全删除 __iter__。即使使用您原来的 max_val,该代码也能正常工作。

  def __iter__(self):
    queue = mp.Queue()
    feeder_process = mp.Process(
      target=generator,args=(
        queue,self.max_val,),daemon=True
    )
    feeder_process.start()
    msg = queue.get()
    while msg != STOP_MSG:
      yield msg
      msg = queue.get()

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...