问题描述
可以花些力气来获得这样的“ ”(尝试命名),但是正如@jasonharper所述,您肯定需要在子进程内组装子生成器,因为运行发电机不能腌制。
下面的模式是可重用的,只有生成器函数gen()
是针对此示例定制的。该设计multiprocessing.SimpleQueue
用于将生成器结果返回给父级并multiprocessing.Barrier
进行同步。
调用Barrier.wait()
将阻塞调用者(任何进程中的线程),直到指定的数目parties
已调用.wait()
,随后所有当前等待Barrier
获取的线程同时释放。Barrier
此处的用法可确保仅在父级从迭代接收到所有 结果 之后 才开始计算进一步的生成器结果,这可能是控制总体内存消耗的理想方法。 __
使用的并行工作程序数量等于您在gen_args_tuples
-iterable中提供的参数元组的数量,因此gen_args_tuples=zip(range(4))
将使用四个工作程序。有关更多详细信息,请参见代码中的注释。
import multiprocessing as mp
SENTINEL = 'SENTINEL'
def gen(a):
"""Your individual generator function."""
lst = ['a', 'b', 'c']
for ch in lst:
for _ in range(int(10e6)): # some dummy computation
pass
yield ch + str(a)
def _worker(i, barrier, queue, gen_func, gen_args):
for x in gen_func(*gen_args):
print(f"WORKER-{i} sending item.")
queue.put((i, x))
barrier.wait()
queue.put(SENTINEL)
def parallel_gen(gen_func, gen_args_tuples):
"""Construct and yield from parallel generators
build from `gen_func(gen_args)`.
"""
gen_args_tuples = list(gen_args_tuples) # ensure list
n_gens = len(gen_args_tuples)
sentinels = [SENTINEL] * n_gens
queue = mp.SimpleQueue()
barrier = mp.Barrier(n_gens + 1) # `parties`: + 1 for parent
processes = [
mp.Process(target=_worker, args=(i, barrier, queue, gen_func, args))
for i, args in enumerate(gen_args_tuples)
]
for p in processes:
p.start()
while True:
results = [queue.get() for _ in range(n_gens)]
if results != sentinels:
results.sort()
yield tuple(r[1] for r in results) # sort and drop ids
barrier.wait() # all workers are waiting
# already, so this will unblock immediately
else:
break
for p in processes:
p.join()
if __name__ == '__main__':
for res in parallel_gen(gen_func=gen, gen_args_tuples=zip(range(4))):
print(res)
WORKER-1 sending item.
WORKER-0 sending item.
WORKER-3 sending item.
WORKER-2 sending item.
('a0', 'a1', 'a2', 'a3')
WORKER-1 sending item.
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-0 sending item.
('b0', 'b1', 'b2', 'b3')
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-1 sending item.
WORKER-0 sending item.
('c0', 'c1', 'c2', 'c3')
Process finished with exit code 0
解决方法
假设我有N个生成器gen_1,...,gen_N
,其中每个生成器将产生相同数量的值。我想要一个生成器gen
,使其在N个并行进程中运行gen_1,…,gen_N(next(gen_1),next(gen_2),... next(gen_N))
那就是我想要的:
def gen():
yield (next(gen_1),... next(gen_N))
每个gen_i都在自己的进程上运行。是否有可能做到这一点?我在下面的虚拟示例中尝试执行此操作,但未成功:
A = range(4)
def gen(a):
B = ['a','b','c']
for b in B:
yield b + str(a)
def target(g):
return next(g)
processes = [Process(target=target,args=(gen(a),)) for a in A]
for p in processes:
p.start()
for p in processes:
p.join()
但是我得到了错误TypeError: cannot pickle 'generator' object
。
编辑:
我已经修改了@darkonaut答案,以适应我的需要。如果有人觉得有用,我会发布它。我们首先定义几个实用程序函数:
from itertools import zip_longest
from typing import List,Generator
def grouper(iterable,n,fillvalue=iter([])):
"Collect data into fixed-length chunks or blocks"
args = [iter(iterable)] * n
return zip_longest(*args,fillvalue=fillvalue)
def split_generators_into_batches(generators: List[Generator],n_splits):
chunks = grouper(generators,len(generators) // n_splits + 1)
return [zip_longest(*chunk) for chunk in chunks]
下列类负责将任意数量的生成器拆分为n个(进程数)批处理,并对其进行处理以产生所需的结果:
import multiprocessing as mp
class GeneratorParallelProcessor:
SENTINEL = 'S'
def __init__(self,generators,n_processes = 2 * mp.cpu_count()):
self.n_processes = n_processes
self.generators = split_generators_into_batches(list(generators),n_processes)
self.queue = mp.SimpleQueue()
self.barrier = mp.Barrier(n_processes + 1)
self.sentinels = [self.SENTINEL] * n_processes
self.processes = [
mp.Process(target=self._worker,args=(self.barrier,self.queue,gen)) for gen in self.generators
]
def process(self):
for p in self.processes:
p.start()
while True:
results = list(itertools.chain(*(self.queue.get() for _ in self.generators)))
if results != self.sentinels:
yield results
self.barrier.wait()
else:
break
for p in self.processes:
p.join()
def _worker(self,barrier,queue,generator):
for x in generator:
queue.put(x)
barrier.wait()
queue.put(self.SENTINEL)
要使用它,只需执行以下操作:
parallel_processor = GeneratorParallelProcessor(generators)
for grouped_generator in parallel_processor.process():
output_handler(grouped_generator)