问题描述
是的,那是可行的。您的计算不依赖于中间结果,因此您可以轻松地将任务划分为多个块并将其分布在多个流程中。这就是所谓的
。
这里唯一棘手的部分可能是,首先将范围分成相当相等的部分。理顺我的个人lib两个功能来处理此问题:
# mp_utils.py
from itertools import accumulate
def calc_batch_sizes(n_tasks: int, n_workers: int) -> list:
"""Divide `n_tasks` optimally between n_workers to get batch_sizes.
Guarantees batch sizes won't differ for more than 1.
Example:
# >>>calc_batch_sizes(23, 4)
# Out: [6, 6, 6, 5]
In case you're going to use numpy anyway, use np.array_split:
[len(a) for a in np.array_split(np.arange(23), 4)]
# Out: [6, 6, 6, 5]
"""
x = int(n_tasks / n_workers)
y = n_tasks % n_workers
batch_sizes = [x + (y > 0)] * y + [x] * (n_workers - y)
return batch_sizes
def build_batch_ranges(batch_sizes: list) -> list:
"""Build batch_ranges from list of batch_sizes.
Example:
# batch_sizes [6, 6, 6, 5]
# >>>build_batch_ranges(batch_sizes)
# Out: [range(0, 6), range(6, 12), range(12, 18), range(18, 23)]
"""
upper_bounds = [*accumulate(batch_sizes)]
lower_bounds = [0] + upper_bounds[:-1]
batch_ranges = [range(l, u) for l, u in zip(lower_bounds, upper_bounds)]
return batch_ranges
然后您的主脚本将如下所示:
import time
from multiprocessing import Pool
from mp_utils import calc_batch_sizes, build_batch_ranges
def target_foo(batch_range):
return sum(batch_range) # ~ 6x faster than target_foo1
def target_foo1(batch_range):
numbers = []
for num in batch_range:
numbers.append(num)
return sum(numbers)
if __name__ == '__main__':
N = 100000000
N_CORES = 4
batch_sizes = calc_batch_sizes(N, n_workers=N_CORES)
batch_ranges = build_batch_ranges(batch_sizes)
start = time.perf_counter()
with Pool(N_CORES) as pool:
result = pool.map(target_foo, batch_ranges)
r_sum = sum(result)
print(r_sum)
print(f'elapsed: {time.perf_counter() - start:.2f} s')
请注意,我也将for循环切换为range对象的简单总和,因为它提供了更好的性能。如果您无法在实际的应用程序中执行此操作,则列表理解仍比示例中的手动填充列表要快60%。
示例输出:
4999999950000000
elapsed: 0.51 s
Process finished with exit code 0
解决方法
几天来一直在寻找答案,但无济于事。我可能只是不了解其中漂浮的部分,并且multiprocessing
模块上的Python文档相当大,我不清楚。
假设您有以下for循环:
import timeit
numbers = []
start = timeit.default_timer()
for num in range(100000000):
numbers.append(num)
end = timeit.default_timer()
print('TIME: {} seconds'.format(end - start))
print('SUM:',sum(numbers))
输出:
TIME: 23.965870224497916 seconds
SUM: 4999999950000000
对于此示例,您有一个4核处理器。是否有办法总共创建4个进程,其中每个进程都在单独的CPU内核上运行,并且完成速度大约快4倍,所以24s /
4个进程=〜6秒?
以某种方式将for循环划分为4个相等的块,然后将这4个块添加到数字列表中以等于相同的总和?有一个stackoverflow线程:并行简单For循环,但我不明白。谢谢大家