并发期货:何时以及如何实施?

问题描述

from concurrent.futures import ProcesspoolExecutor
from concurrent.futures import as_completed
import numpy as np
import time

#creating iterable
testDict = {}
for i in range(1000):
    testDict[i] = np.random.randint(1,10)
    

#default method
stime = time.time()    
newdict = []

for k,v in testDict.items():
    for i in range(1000):
        v = np.tanh(v)
    newdict.append(v)
    
etime = time.time()
print(etime - stime) 
#output: 1.1139910221099854  



#multi processing
stime = time.time()
testresult = []

def f(item):
    x = item[1]
    for i in range(1000):
        x = np.tanh(x)
    return x

def main(testDict):
    with ProcesspoolExecutor(max_workers = 8) as executor:
        futures = [executor.submit(f,item) for item in testDict.items()]
        for future in as_completed(futures):
            testresult.append(future.result())
            
if __name__ == '__main__':
    main(testDict)    

etime = time.time()
print(etime - stime)
#output: 3.4509658813476562

学习多处理和测试内容。进行测试以检查我是否正确实现了这一点。从输出时间来看,并发方法要慢3倍。那怎么了?

我的目标是并行处理一个脚本,该脚本主要在大约500个项目的字典上运行。每个循环将处理和更新这500个项目的值。这个循环可以说是5000代。 k,v对中没有一个与其他k,v对交互。 [它是一种遗传算法]。

我还在寻找有关如何并行化上述目标的指南。如果我在遗传算法代码中对每个函数使用正确的并发期货方法,其中每个函数需要输入一个字典并输出一个新字典,这会有用吗?任何指南/资源/帮助表示赞赏。

编辑:如果我运行以下示例:https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example,则解决方法要比认的循环检查多3倍。

解决方法

这里有几个基本问​​题,您使用的是numpy,但您没有对计算进行向量化。在这里编写代码的方式不会使您受益于numpy的速度优势,还可能只使用标准库math模块,对于这种类型的代码,它比numpy更快:

# 0.089sec
import math
for k,v in testDict.items():
    for i in range(1000):
        v = math.tanh(v)
    newdict.append(v)

对操作进行矢量化处理后,才可以看到numpy的好处:

# 0.016sec
for k,v in testDict.items():
    arr = no.full(1000,v)
    arr2 = np.tanh(arr)
    newdict.append(arr2[-1])

为进行比较,您的原始单线程代码在我的测试机上的运行时间为1.171秒。如您所见,如果使用不正确,NumPy可能比纯Python慢​​几个数量级。

现在继续说明为什么看到自己的内容。

说实话,我无法复制您的计时结果。对于我来说,您的原始多处理代码在macOS(在Python 3.6上)的运行时间为0.299秒,比单进程代码要快。但是,如果我不得不猜测,您可能正在使用Windows?在Windows等平台上,创建子进程并设置环境来运行多处理任务非常昂贵,因此对持续时间少于几秒钟的任务使用多处理具有可疑的好处。如果您对为什么感兴趣,请read here

此外,在缺少可用的fork()的平台(如Python 3.8或Windows之后的MacOS)中,当您使用多处理时,子进程必须重新导入该模块,因此,如果将两个代码都放在同一个文件中,则必须在子进程中运行单线程代码,然后才能运行多处理代码。您可能需要将测试代码放入函数中,并使用if __name__ == "__main__"块保护顶级代码。在不使用Mac的非fork-safe框架库的情况下,在使用Python 3.8或更高版本的Mac上,还可以通过调用multiprocessing.set_start_method("fork")来恢复使用fork方法。

解决了这个问题,继续您的标题问题。

使用多重处理时,需要将数据复制到子流程,然后再复制到主流程以检索结果,并且产生子流程会产生一定的成本。要从多处理中受益,您需要设计工作负载,以使这部分成本可以忽略不计。

如果您的数据来自外部来源,请尝试将数据加载到子进程中,而不是让主进程加载数据,然后再将其传输到子进程,让主进程告诉子进程如何获取其切片数据。在这里,您是在主流程中生成testDict的,因此,如果可以的话,请对其进行并行化,然后将其移至子代。

此外,由于您使用的是numpy,如果正确地向量化了您的操作,则numpy在进行向量化操作时会释放GIL,因此您可以只使用多线程。由于numpy在矢量操作期间不保存GIL,因此您可以在单个Python进程中利用多个线程,并且由于线程共享内存,因此您无需将数据派生或复制到子进程中。