为什么工作完成后不能加入进程?

问题描述

我在一个班级中有一个 Queue一个 Manager一个 Lock。该类有一个运行函数,它启动 6 Process 并且它们都等待 exit_flag 为真以结束它们的工作。但是,我无法“结束”作业,因为当我在每个作业上调用 join 方法时,它会阻止它。代码如下:

from multiprocessing import Process,Lock,Queue,Manager


class MyClass():
     def __init__(self):
          self.q = Queue(maxsize=50)
          self.lock = Lock()
          manager = Manager()
          self.manager = manager.dict()

     def fill_queue(self,idx):
          while not self.exit():
               #do something
               result,result_type= self.perform_extraction()
               if result_type not in self.manager():
                    self.manager[result_type] = []
               while self.q.full() and not self.exit():
                    sleep(10)
               if self.exit():
                    print('Exit filler')
                    break
               self.lock.acquire()
               self.q.put((result,result_type))
               self.lock.release()
          else:
               print(f'queue filler {idx} ended')

     def empty_queue(self,idx):
          while not self.exit():
               if self.q.emtpy():
                    continue
               self.lock.aquire()
               result,result_type = self.q.get()
               self.lock.release()
               result,id = self.perform_test(queue_value)
               if result >=0 and result not in self.manager[result_type]:
                    self.manager[id] += [(result,result_type)]
               self.insert_to_database(result,result_type) --> this inserts the value into a sqlite3 ddbb
          else:
               print(f'worker {idx} ended')


     def run(self,n_workers):
          jobs = []
          for _ in range(2):
               p = Process(target = self.fill_queue,args=(_,))
               jobs.append(p)

          for _ in range(n_workers):
               p = Process(target = self.empty_queue,))
               jobs.append(p)

          for job in jobs:
               job.start()

          for idx,job in enumerate(jobs):
               print(f'joining job {idx}')
               job.join()
               if not job.is_alive():
                    print(f'closing job {idx}')
                    job.close()
               else:
                    print(f'job {idx} still alive')

if __name__ == '__main__':
     mc = MyClass()
     mc.run(n_workers = 4)
     print('RUN ENDED!')

manager 用于进程之间的通信,当满足条件并且 manager 中有 X 个元素时,self.exit() 函数返回 True

当我运行此代码时,它会卡住打印 joining job 0 并且永远停留在那里,我不知道为什么。如果我添加一些超时并将其设置为 job.join(5)(5 任意值,没有真正的原因),它会打印:

joining job 0
job 0 still alive
joining job 1
job 1 still alive
joining job 2
closing job 2
joining job 3
closing job 3
joining job 4
closing job 4
joining job 5
closing job 5
RUN ENDED!

代码没有完成。如果作业仍然存在,我还尝试执行 job.terminate() 并且这引发了一个错误,告诉我无法找到某些泄露的文件夹。这是否意味着我有一些僵尸进程?

为什么会这样?我做错了什么?


EDIT添加了一些与 manager() 交互的逻辑。我正在使用管理器添加几种类型的结果并将相同类型的所有结果附加到列表中,因此 dict 结构类似于 {result_type:[result_values]} 并且使用管理器的原因是为了避免存储重复的结果和检查算法何时满足退出条件。

现在,退出条件函数如下所示:

def exit(self):
     for v in self.manager.values():
          if len(v) < 10:
               return False
     return True

因此,当我在每种类型的每个列表上都有 10 个项目时,所有进程都会结束。有 3 种可能的不同类型,因此一旦将每种类型添加为管理器键,只需填充它,当所有 3 种都至少有 10 个值(可能还有更多)时,所有作业都应该结束。


编辑 2:向 fill_queueempty_queue 函数添加了一些打印信息。打印出来的是:

worker 0 结束 工人 1 结束 队列填充 0 结束 工人 2 结束 队列填充 1 结束 加入工作 0 --> * 工人 3 结束

  • 所以这通常(像往常一样)在所有工作人员打印“结束”语句之前打印,但它永远不会加入第一个作业。这实际上在 join 循环中第一次尝试调用 for idx,job in enumerate(jobs): 方法时卡住了。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...