问题描述
我在一个班级中有一个 Queue
、一个 Manager
和一个 Lock
。该类有一个运行函数,它启动 6 Process
并且它们都等待 exit_flag 为真以结束它们的工作。但是,我无法“结束”作业,因为当我在每个作业上调用 join 方法时,它会阻止它。代码如下:
from multiprocessing import Process,Lock,Queue,Manager
class MyClass():
def __init__(self):
self.q = Queue(maxsize=50)
self.lock = Lock()
manager = Manager()
self.manager = manager.dict()
def fill_queue(self,idx):
while not self.exit():
#do something
result,result_type= self.perform_extraction()
if result_type not in self.manager():
self.manager[result_type] = []
while self.q.full() and not self.exit():
sleep(10)
if self.exit():
print('Exit filler')
break
self.lock.acquire()
self.q.put((result,result_type))
self.lock.release()
else:
print(f'queue filler {idx} ended')
def empty_queue(self,idx):
while not self.exit():
if self.q.emtpy():
continue
self.lock.aquire()
result,result_type = self.q.get()
self.lock.release()
result,id = self.perform_test(queue_value)
if result >=0 and result not in self.manager[result_type]:
self.manager[id] += [(result,result_type)]
self.insert_to_database(result,result_type) --> this inserts the value into a sqlite3 ddbb
else:
print(f'worker {idx} ended')
def run(self,n_workers):
jobs = []
for _ in range(2):
p = Process(target = self.fill_queue,args=(_,))
jobs.append(p)
for _ in range(n_workers):
p = Process(target = self.empty_queue,))
jobs.append(p)
for job in jobs:
job.start()
for idx,job in enumerate(jobs):
print(f'joining job {idx}')
job.join()
if not job.is_alive():
print(f'closing job {idx}')
job.close()
else:
print(f'job {idx} still alive')
if __name__ == '__main__':
mc = MyClass()
mc.run(n_workers = 4)
print('RUN ENDED!')
manager
用于进程之间的通信,当满足条件并且 manager
中有 X 个元素时,self.exit()
函数返回 True
。
当我运行此代码时,它会卡住打印 joining job 0
并且永远停留在那里,我不知道为什么。如果我添加一些超时并将其设置为 job.join(5)
(5 任意值,没有真正的原因),它会打印:
joining job 0
job 0 still alive
joining job 1
job 1 still alive
joining job 2
closing job 2
joining job 3
closing job 3
joining job 4
closing job 4
joining job 5
closing job 5
RUN ENDED!
代码没有完成。如果作业仍然存在,我还尝试执行 job.terminate()
并且这引发了一个错误,告诉我无法找到某些泄露的文件夹。这是否意味着我有一些僵尸进程?
为什么会这样?我做错了什么?
EDIT:添加了一些与 manager()
交互的逻辑。我正在使用管理器添加几种类型的结果并将相同类型的所有结果附加到列表中,因此 dict 结构类似于 {result_type:[result_values]}
并且使用管理器的原因是为了避免存储重复的结果和检查算法何时满足退出条件。
def exit(self):
for v in self.manager.values():
if len(v) < 10:
return False
return True
因此,当我在每种类型的每个列表上都有 10 个项目时,所有进程都会结束。有 3 种可能的不同类型,因此一旦将每种类型添加为管理器键,只需填充它,当所有 3 种都至少有 10 个值(可能还有更多)时,所有作业都应该结束。
编辑 2:向 fill_queue
和 empty_queue
函数添加了一些打印信息。打印出来的是:
worker 0 结束 工人 1 结束 队列填充 0 结束 工人 2 结束 队列填充 1 结束 加入工作 0 --> * 工人 3 结束
- 所以这通常(像往常一样)在所有工作人员打印“结束”语句之前打印,但它永远不会加入第一个作业。这实际上在
join
循环中第一次尝试调用for idx,job in enumerate(jobs):
方法时卡住了。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)