使用多重处理从队列中读取

问题描述

这是我使用Python多重处理从队列中填充和读取的代码:

from multiprocessing import Lock,Process,Queue,Pool

import time
from random import randint

def add_to_queue(tasks_to_accomplish,name):
    while True:
        random_int = randint(0,22)
        print('name',name,"adding",random_int)
        tasks_to_accomplish.put(random_int)
        time.sleep(2)

def read_from_queue(tasks_to_accomplish,name):
    while True:
        item = tasks_to_accomplish.get()
        print('name',item)

        time.sleep(.01)


if __name__ == '__main__':
    tasks_to_accomplish = Queue()

    p = Process(target=add_to_queue,args=(tasks_to_accomplish,"p"))
    p.start()

    p2 = Process(target=read_from_queue,"p2"))
    p2.start()
    p3 = Process(target=read_from_queue,"p3"))
    p3.start()

    p.join()
    p2.join()
    p3.join()

代码将无限执行,这是部分输出:

name p adding 3
name p2 3
name p adding 4
name p3 4
name p adding 0
name p2 0
name p adding 22
name p3 22
name p adding 2
name p2 2
name p adding 13
name p3 13
name p adding 0
name p2 0
name p adding 14
name p3 14
name p adding 20
name p2 20
name p adding 4
name p3 4

从队列中读取所花费的时间为0.01秒:time.sleep(.01)。但是p2和p3进程似乎并没有在0.01秒内读取线程,因为很明显它们阻塞的时间超过了0.01秒。我是否正确实施了进程线程以从队列中读取?

解决方法

正如丹尼尔指出的那样,Queue.get()将一直阻塞,直到默认情况下可用数据为止。

您可以使用q.get(block=True)进行更改,尽管这会raise an exception

name p adding 12
name p2 12
Process Process-6:
Traceback (most recent call last):
  File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/process.py",line 313,in _bootstrap
    self.run()
  File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/process.py",line 108,in run
    self._target(*self._args,**self._kwargs)
  File "<ipython-input-2-4e6d57c64980>",line 15,in read_from_queue
    item = tasks_to_accomplish.get(block=False)
  File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/queues.py",line 110,in get
    raise Empty
_queue.Empty
Process Process-5:
Traceback (most recent call last):
  File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/process.py",in get
    raise Empty
_queue.Empty
name p adding 2
name p adding 12
name p adding 14
name p adding 21
name p adding 9
name p adding 13

您需要:

def read_from_queue(tasks_to_accomplish,name):
    while True:
        try:
            item = tasks_to_accomplish.get(block=False)
        except:
            print('no data for',name)
        else:
            print('name',name,item)
    
        time.sleep(.01)

获得:

name p adding 0
name p2 0
no data for p3
no data for p3
no data for p2
no data for p2
no data for p3
no data for p2
no data for p3
# about 350 more entries like this
name p adding 5
no data for p2
name p3 5
no data for p2
no data for p3
no data for p3
no data for p2
no data for p3
# ...

除非您需要在两次读取之间做一些工作,否则我会说,您已经正确实现了读取过程(读取时可以安全地删除对sleep的调用)。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...