问题描述
我有一个可迭代的 Python 类,它包含一个多处理生成器。有些用例只需要生成的一个子集,所以它被包裹在 islice 中。
但是,当使用 islice 时调用挂起,我猜是由于底层多处理进程没有意识到事情已经结束。
from itertools import islice
import multiprocessing as mp
STOP_MSG = 'STOP!'
def generator(queue,max_val):
for i in range(max_val):
queue.put(i)
queue.put(STOP_MSG)
class GeneratorMPProc:
def __init__(self,max_val):
self.max_val = max_val
def __iter__(self):
queue = mp.Queue()
Feeder_process = mp.Process(
target=generator,args=(
queue,self.max_val,))
Feeder_process.start()
msg = queue.get()
while msg != STOP_MSG:
yield msg
msg = queue.get()
Feeder_process.join()
if __name__ == '__main__':
max_val = 0xFFFFFFFFF
end_val = 10
psm = GeneratorMPProc(max_val)
rsm = [i for i in islice(psm,end_val)]
如何解决这个问题,以便即使在使用 islice 或任何子集选择器时它也能正确终止?
解决方法
您的 isllice
调用在 GeneratorMPProc.iter
返回之前不会返回,并且 max_val
设置为 0xFFFFFFFFF
(写入队列不是最快的操作,这也会消耗一些资源)。换句话说,“事情还没有结束”,直到您的生成器函数结束,因此您的 multiprocess.Process
实际上结束并可以加入。
将 max_val
设置为诸如 20 之类的值,您的程序将很容易终止。
if __name__ == '__main__':
#max_val = 0xFFFFFFFFF
max_val = 20
end_val = 10
psm = GeneratorMPProc(max_val)
rsm = [i for i in islice(psm,end_val)]
print(rsm)
打印:
[0,1,2,3,4,5,6,7,8,9]
更新
您可能需要考虑使用额外的参数 daemon=True
启动“生成器”进程,使其成为守护进程,然后从方法 feeder_process.join()
中完全删除 __iter__
。即使使用您原来的 max_val
,该代码也能正常工作。
def __iter__(self):
queue = mp.Queue()
feeder_process = mp.Process(
target=generator,args=(
queue,self.max_val,),daemon=True
)
feeder_process.start()
msg = queue.get()
while msg != STOP_MSG:
yield msg
msg = queue.get()