如何使用python多处理池做到这一点

问题描述

我经常使用以下模式并行化python中的任务。我这样做是因为填充输入队列很快,并且一旦进程启动并异步运行,我就可以在循环中调用阻塞的get()并在准备好结果时将其取出。对于需要几天的任务,这很棒,因为我可以执行报告进度之类的事情。

from multiprocessing import Process,Queue
class worker():
    def __init__(self,init_dict,):
        self.init_dict = init_dict

    def __call__(self,task_queue,done_queue):
        for task_args in task_queue.get()
            task_result = self.do_work(task_args)
            done_queue.put(task_result)

if __name__=="__main__":
    n_threads = 8
    init_dict = {} # whatever we need to setup our class
    worker_class = worker(init_dict)

    task_queue = Queue()
    done_queue = Queue()

    some_iterator = [1,2,3,4,5] # or a list of files to chew through normally
    for task in some_iterator:
       task_queue.put(task)

    for i in range(n_threads):
        Process(target=worker_class,args=(task_queue,done_queue)).start()

    for i in range(len(some_iterator)):
        result = done_queue.get()
        # do something with result
        # print out progress stats,whatever,as tasks complete

我已经掩盖了一些细节,例如捕获错误,处理失败的事件,杀死僵尸进程,在任务队列末尾退出并捕获回溯,但是您明白了。我真的很喜欢这种模式,它非常适合我的需求。我有很多使用它的代码

我需要更多的计算能力,并且希望将工作分散到整个集群中。 Ray提供了一个多处理池,其API与python多处理的API相匹配。我只是想不出如何使上述模式起作用。我主要得到: RuntimeError:队列对象仅应通过继承在进程之间共享

有人对使用池而不是n个单独的进程时如何从队列中获取结果有任何建议吗?

我很欣赏如果我进行大量重写,那么可能还有其他方法可以从ray中获得我想要的东西,但是我有很多这样的代码,因此希望尽量减少更改。

谢谢

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)