问题描述
与Matlab的parloop
相比,我对Python的并行循环效率低感兴趣。
在这里,我提出一个简单的寻根问题,即在a
和b
之间强行强制初始10 ^ 6初始猜测。
import numpy as np
from scipy.optimize import root
import matplotlib.pyplot as plt
import multiprocessing
# define the function to find the roots
func = lambda x: np.sin(3*np.pi*np.cos(np.pi*x)*np.sin(np.pi*x))
def forfunc(x0):
q = [root(func,xi).x for xi in x0]
q = np.array(q).T[0]
return q
# variables os the problem
a = -3
b = 5
n = int(1e6)
x0 = np.linspace(a,b,n) # list of initial guesses
# the single-process loop
q = forfunc(x0)
# parallel loop
nc = 4
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()
单进程循环耗时1分钟26s,并行循环耗时1min 7s。在加速比为1.28的情况下,我看到了一些改进,但是在这种情况下,效率(timeloop/timeparallel/n_process)
为0.32。
这里发生了什么以及如何提高效率? 我在做错什么吗?
我还尝试了两种方式使用dask.delayed
:
import dask
# Every call is a delayed object
q = dask.compute(*[dask.delayed(func)(xi) for xi in x0])
# Every chunk is a delayed object
q = dask.compute(*[dask.delayed(forfunc)(x0i) for x0i in np.split(x0,nc)])
在这里,两者都比单进程循环花费更多的时间。 第一次尝试的时间为3分钟,第二次尝试的时间为27秒。
解决方法
Dask(或Spark)发生了什么
通过单进程测试,您的循环可以在90秒内执行一百万个任务。因此,平均每个任务花费您的CPU大约90微秒。
在提供灵活性和弹性的分布式计算框架(如Dask或Spark)中,任务的相关开销很小。每个任务的Dask开销低至200 microseconds。 Spark 3.0 documentation建议Spark可以支持短至200 毫秒的任务,这也许意味着Dask的开销实际上比Spark少1000倍。听起来好像Dask在这里确实做得很好!
如果您的任务快于框架的每个任务的开销,则与在相同数量的机器/内核上手动分配工作相比,使用它会发现性能更差。在这种情况下,您会遇到这种情况。
在分块数据Dask示例中,您只有几个任务,因此可以减少开销,从而获得更好的性能。但是,相对于原始的多处理,您可能会因为Dask的开销而对性能造成很小的影响,或者您没有使用Dask集群并在单个进程中运行任务。
多处理(和Dask)应该有帮助
对于这种令人尴尬的并行问题,使用多处理的结果通常是出乎意料的。您可能需要确认计算机上物理内核的数量,尤其要确保没有其他事情在积极利用您的CPU内核。一无所知,我想这就是元凶。
在具有两个物理核心的笔记本电脑上,您的示例需要:
- 单个过程循环2分钟1秒
- 两个过程1分钟2秒
- 四个过程1分钟
- 1分钟5秒,用于一个带有
nc=2
的分块式Dask示例,该示例分为两个块,以及一个由两个工作线程和每个工作线程一个线程组成的LocalCluster。可能需要仔细检查您正在集群上运行。
通过两个进程获得大约2倍的加速与我的笔记本电脑上的预期一致,因为看到更多此进程或CPU绑定任务带来的收益很少或没有收益。与原始多重处理相比,Dask还增加了一些开销。
%%time
# the single-process loop
q = forfunc(x0)
CPU times: user 1min 55s,sys: 1.68 s,total: 1min 57s
Wall time: 2min 1s
%%time
# parallel loop
nc = 2
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()
CPU times: user 92.6 ms,sys: 70.8 ms,total: 163 ms
Wall time: 1min 2s
%%time
# parallel loop
nc = 4
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,nc)))
pool.close()
CPU times: user 118 ms,sys: 94.6 ms,total: 212 ms
Wall time: 1min
from dask.distributed import Client,LocalCluster,wait
client = Client(n_workers=2,threads_per_worker=1)
%%time
nc = 2
chunks = np.split(x0,nc)
client.scatter(chunks,broadcast=True)
q = client.compute([dask.delayed(forfunc)(x0i) for x0i in chunks])
wait(q)
/Users/nickbecker/miniconda3/envs/prophet/lib/python3.7/site-packages/distributed/worker.py:3382: UserWarning: Large object of size 4.00 MB detected in task graph:
(array([1.000004,1.000012,1.00002,...,4.99998 ... 2,5. ]),)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers
future = client.submit(func,big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func,big_future) # good
% (format_bytes(len(b)),s)
CPU times: user 3.67 s,sys: 324 ms,total: 4 s
Wall time: 1min 5s