使用两个队列作为生产者-消费者模式的 Python 多处理可能出现死锁?

问题描述

我想知道以下代码中是否存在某种死锁。我必须读取数据库的每个元素(大约 100 万个项目),对其进行处理,然后将结果收集到一个唯一的文件中。

我使用两个队列和三种类型的进程通过多处理并行执行:

  • Reader:读取数据库并将读取的项目添加到task_queue中的主进程
  • Worker:进程池。每个工作人员从 task_queue 中获取一个项目,处理该项目,将结果保存在存储在 item_name/item_name.txt 中的中间文件中,并将 item_name 放入 Completed_queue
  • Writer:进程从completed_queue获取item_name,从item_name/item_name.txt获取中间结果,写入results.txt
from multiprocessing import Pool,Process,Queue
class computation():

    def __init__(self,K):
        self.task_queue = Queue()
        self.completed_queue = Queue()
        self.n_cpus = K

    def reader(self,):
        with open(db,"r") as db:
            ... # Read an item
            self.task_queue.put(item)
            
    def worker(self,):
        while True:
            item = self.task_queue.get(True)
            if item == "STOP":
                break
            self.process_item(item)

    def writer_process(self,):
        while True:
            f = self.completed_queue.get(True)
            if f == "DONE":
               break
            self.write_f(f)

    def run(self,):
        pool = Pool(n_cpus,self.worker,args=())
        
        writer = Process(target=self.writer_process,args=())
        writer.start()

        self.reader()

        pool.close()
        pool.join()

        self.completed_queue.put("DONE")
        writer.join()

代码有效,但似乎有时编写器或池停止工作(或者它们非常慢)。在这种情况下是否可能出现死锁?

解决方法

您的代码存在一些问题。首先,通过按原样使用队列,您实际上是在创建自己的进程池,而根本不需要使用 multiprocessing.Pool 类。您正在使用池初始值设定项作为实际的池工作者,这有点滥用此类;您最好只使用常规的 Process 实例(无论如何,我的意见)。

其次,尽管您将消息 DONE 发送到 writer_process 以通知它终止是很好的,但您没有对 self.n_cpus {{1} } 进程,这些进程正在寻找“STOP”消息,因此 worker 函数需要将 reader self.n_cpus 消息放入任务队列:

STOP

就个人而言,我不会使用“STOP”和“DONE”作为哨兵​​m>消息,而是使用 from multiprocessing import Process,Queue class Computation(): def __init__(self,K): self.task_queue = Queue() self.completed_queue = Queue() self.n_cpus = K def reader(self,): with open(db,"r") as db: ... # Read an item self.task_queue.put(item) # signal to the worker processes to terminate: for _ in range(self.n_cpus): self.task_queue.put('STOP') def worker(self,): while True: item = self.task_queue.get(True) if item == "STOP": break self.process_item(item) def writer_process(self,): while True: f = self.completed_queue.get(True) if f == "DONE": break self.write_f(f) def run(self): processes = [Process(target=self.worker) for _ in range(self.n_cpus)] for p in processes: p.start() writer = Process(target=self.writer_process,args=()) writer.start() self.reader() for p in processes: p.join() self.completed_queue.put("DONE") writer.join() 来代替,假设这不是有效的实际消息。我已经测试了上面的代码,其中 None 只是处理列表中的字符串,reader 只是简单地将 'done' 附加到这些字符串中的每一个,并将修改后的字符串放在 self.process_item(item) 上并替换了 { {1}} 在带有 completed_queue 调用的 self.write_f 中。我没有看到代码有任何问题。

更新以使用托管队列

免责声明:我没有使用 mpi4py 的经验,也不知道队列代理如何分布在不同的计算机上。上面的代码可能不够充分,如以下文章 How to share mutliprocessing queue object between multiple computers 所建议的那样。 但是,该代码正在创建 Queue.Queue 的实例(该代码是 Python 2 代码),不是 multiprocessing.SyncManager 返回的代理。 这方面的文档很差。尝试上面的更改,看看它是否效果更好(会更慢)。

因为writer_process返回的代理,我不得不重新整理一下代码;队列现在作为参数显式传递给进程函数:

print