问题描述
我经常使用以下模式并行化python中的任务。我这样做是因为填充输入队列很快,并且一旦进程启动并异步运行,我就可以在循环中调用阻塞的get()并在准备好结果时将其取出。对于需要几天的任务,这很棒,因为我可以执行报告进度之类的事情。
from multiprocessing import Process,Queue
class worker():
def __init__(self,init_dict,):
self.init_dict = init_dict
def __call__(self,task_queue,done_queue):
for task_args in task_queue.get()
task_result = self.do_work(task_args)
done_queue.put(task_result)
if __name__=="__main__":
n_threads = 8
init_dict = {} # whatever we need to setup our class
worker_class = worker(init_dict)
task_queue = Queue()
done_queue = Queue()
some_iterator = [1,2,3,4,5] # or a list of files to chew through normally
for task in some_iterator:
task_queue.put(task)
for i in range(n_threads):
Process(target=worker_class,args=(task_queue,done_queue)).start()
for i in range(len(some_iterator)):
result = done_queue.get()
# do something with result
# print out progress stats,whatever,as tasks complete
我已经掩盖了一些细节,例如捕获错误,处理失败的事件,杀死僵尸进程,在任务队列末尾退出并捕获回溯,但是您明白了。我真的很喜欢这种模式,它非常适合我的需求。我有很多使用它的代码。
我需要更多的计算能力,并且希望将工作分散到整个集群中。 Ray提供了一个多处理池,其API与python多处理的API相匹配。我只是想不出如何使上述模式起作用。我主要得到: RuntimeError:队列对象仅应通过继承在进程之间共享
有人对使用池而不是n个单独的进程时如何从队列中获取结果有任何建议吗?
我很欣赏如果我进行大量重写,那么可能还有其他方法可以从ray中获得我想要的东西,但是我有很多这样的代码,因此希望尽量减少更改。
谢谢
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)