问题描述
from concurrent.futures import ProcesspoolExecutor
from concurrent.futures import as_completed
import numpy as np
import time
#creating iterable
testDict = {}
for i in range(1000):
testDict[i] = np.random.randint(1,10)
#default method
stime = time.time()
newdict = []
for k,v in testDict.items():
for i in range(1000):
v = np.tanh(v)
newdict.append(v)
etime = time.time()
print(etime - stime)
#output: 1.1139910221099854
#multi processing
stime = time.time()
testresult = []
def f(item):
x = item[1]
for i in range(1000):
x = np.tanh(x)
return x
def main(testDict):
with ProcesspoolExecutor(max_workers = 8) as executor:
futures = [executor.submit(f,item) for item in testDict.items()]
for future in as_completed(futures):
testresult.append(future.result())
if __name__ == '__main__':
main(testDict)
etime = time.time()
print(etime - stime)
#output: 3.4509658813476562
学习多处理和测试内容。进行测试以检查我是否正确实现了这一点。从输出时间来看,并发方法要慢3倍。那怎么了?
我的目标是并行处理一个脚本,该脚本主要在大约500个项目的字典上运行。每个循环将处理和更新这500个项目的值。这个循环可以说是5000代。 k,v对中没有一个与其他k,v对交互。 [它是一种遗传算法]。
我还在寻找有关如何并行化上述目标的指南。如果我在遗传算法代码中对每个函数使用正确的并发期货方法,其中每个函数需要输入一个字典并输出一个新字典,这会有用吗?任何指南/资源/帮助表示赞赏。
编辑:如果我运行以下示例:https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example,则解决方法要比默认的循环检查多3倍。
解决方法
这里有几个基本问题,您使用的是numpy,但您没有对计算进行向量化。在这里编写代码的方式不会使您受益于numpy的速度优势,还可能只使用标准库math
模块,对于这种类型的代码,它比numpy更快:
# 0.089sec
import math
for k,v in testDict.items():
for i in range(1000):
v = math.tanh(v)
newdict.append(v)
对操作进行矢量化处理后,才可以看到numpy的好处:
# 0.016sec
for k,v in testDict.items():
arr = no.full(1000,v)
arr2 = np.tanh(arr)
newdict.append(arr2[-1])
为进行比较,您的原始单线程代码在我的测试机上的运行时间为1.171秒。如您所见,如果使用不正确,NumPy可能比纯Python慢几个数量级。
现在继续说明为什么看到自己的内容。
说实话,我无法复制您的计时结果。对于我来说,您的原始多处理代码在macOS(在Python 3.6上)的运行时间为0.299秒,比单进程代码要快。但是,如果我不得不猜测,您可能正在使用Windows?在Windows等平台上,创建子进程并设置环境来运行多处理任务非常昂贵,因此对持续时间少于几秒钟的任务使用多处理具有可疑的好处。如果您对为什么感兴趣,请read here。
此外,在缺少可用的fork()的平台(如Python 3.8或Windows之后的MacOS)中,当您使用多处理时,子进程必须重新导入该模块,因此,如果将两个代码都放在同一个文件中,则必须在子进程中运行单线程代码,然后才能运行多处理代码。您可能需要将测试代码放入函数中,并使用if __name__ == "__main__"
块保护顶级代码。在不使用Mac的非fork-safe框架库的情况下,在使用Python 3.8或更高版本的Mac上,还可以通过调用multiprocessing.set_start_method("fork")
来恢复使用fork方法。
解决了这个问题,继续您的标题问题。
使用多重处理时,需要将数据复制到子流程,然后再复制到主流程以检索结果,并且产生子流程会产生一定的成本。要从多处理中受益,您需要设计工作负载,以使这部分成本可以忽略不计。
如果您的数据来自外部来源,请尝试将数据加载到子进程中,而不是让主进程加载数据,然后再将其传输到子进程,让主进程告诉子进程如何获取其切片数据。在这里,您是在主流程中生成testDict的,因此,如果可以的话,请对其进行并行化,然后将其移至子代。
此外,由于您使用的是numpy,如果正确地向量化了您的操作,则numpy在进行向量化操作时会释放GIL,因此您可以只使用多线程。由于numpy在矢量操作期间不保存GIL,因此您可以在单个Python进程中利用多个线程,并且由于线程共享内存,因此您无需将数据派生或复制到子进程中。