如何在Python中提高并行循环的效率 Dask或Spark发生了什么多处理和Dask应该有帮助

问题描述

与Matlab的parloop相比,我对Python的并行循环效率低感兴趣。 在这里,我提出一个简单的寻根问题,即在ab之间强行强制初始10 ^ 6初始猜测。

import numpy as np
from scipy.optimize import root
import matplotlib.pyplot as plt
import multiprocessing

# define the function to find the roots
func = lambda x: np.sin(3*np.pi*np.cos(np.pi*x)*np.sin(np.pi*x))

def forfunc(x0):
    q = [root(func,xi).x for xi in x0]
    q = np.array(q).T[0]
    return q

# variables os the problem
a = -3
b = 5
n = int(1e6)
x0 = np.linspace(a,b,n) # list of initial guesses

# the single-process loop
q = forfunc(x0)

# parallel loop
nc = 4
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()

单进程循环耗时1分钟26s,并行循环耗时1min 7s。在加速比为1.28的情况下,我看到了一些改进,但是在这种情况下,效率(timeloop/timeparallel/n_process)为0.32。

这里发生了什么以及如何提高效率? 我在做错什么吗?

我还尝试了两种方式使用dask.delayed

import dask

# Every call is a delayed object
q = dask.compute(*[dask.delayed(func)(xi) for xi in x0])

# Every chunk is a delayed object
q = dask.compute(*[dask.delayed(forfunc)(x0i) for x0i in np.split(x0,nc)])

在这里,两者都比单进程循环花费更多的时间。 第一次尝试的时间为3分钟,第二次尝试的时间为27秒。

解决方法

Dask(或Spark)发生了什么

通过单进程测试,您的循环可以在90秒内执行一百万个任务。因此,平均每个任务花费您的CPU大约90微秒。

在提供灵活性和弹性的分布式计算框架(如Dask或Spark)中,任务的相关开销很小。每个任务的Dask开销低至200 microseconds。 Spark 3.0 documentation建议Spark可以支持短至200 毫秒的任务,这也许意味着Dask的开销实际上比Spark少1000倍。听起来好像Dask在这里确实做得很好!

如果您的任务快于框架的每个任务的开销,则与在相同数量的机器/内核上手动分配工作相比,使用它会发现性能更差。在这种情况下,您会遇到这种情况。

在分块数据Dask示例中,您只有几个任务,因此可以减少开销,从而获得更好的性能。但是,相对于原始的多处理,您可能会因为Dask的开销而对性能造成很小的影响,或者您没有使用Dask集群并在单个进程中运行任务。

多处理(和Dask)应该有帮助

对于这种令人尴尬的并行问题,使用多处理的结果通常是出乎意料的。您可能需要确认计算机上物理内核的数量,尤其要确保没有其他事情在积极利用您的CPU内核。一无所知,我想这就是元凶。

在具有两个物理核心的笔记本电脑上,您的示例需要:

  • 单个过程循环2分钟1秒
  • 两个过程1分钟2秒
  • 四个过程1分钟
  • 1分钟5秒,用于一个带有nc=2的分块式Dask示例,该示例分为两个块,以及一个由两个工作线程和每个工作线程一个线程组成的LocalCluster。可能需要仔细检查您正在集群上运行。

通过两个进程获得大约2倍的加速与我的笔记本电脑上的预期一致,因为看到更多此进程或CPU绑定任务带来的收益很少或没有收益。与原始多重处理相比,Dask还增加了一些开销。

%%time
​
# the single-process loop
q = forfunc(x0)
CPU times: user 1min 55s,sys: 1.68 s,total: 1min 57s
Wall time: 2min 1s
%%time
​
# parallel loop
nc = 2
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()
CPU times: user 92.6 ms,sys: 70.8 ms,total: 163 ms
Wall time: 1min 2s
%%time
​
# parallel loop
nc = 4
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,nc)))
pool.close()
CPU times: user 118 ms,sys: 94.6 ms,total: 212 ms
Wall time: 1min
from dask.distributed import Client,LocalCluster,wait
client = Client(n_workers=2,threads_per_worker=1)

%%time
​
nc = 2
chunks = np.split(x0,nc)
client.scatter(chunks,broadcast=True)
q = client.compute([dask.delayed(forfunc)(x0i) for x0i in chunks])
wait(q)
/Users/nickbecker/miniconda3/envs/prophet/lib/python3.7/site-packages/distributed/worker.py:3382: UserWarning: Large object of size 4.00 MB detected in task graph: 
  (array([1.000004,1.000012,1.00002,...,4.99998 ... 2,5.      ]),)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func,big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func,big_future)  # good
  % (format_bytes(len(b)),s)
CPU times: user 3.67 s,sys: 324 ms,total: 4 s
Wall time: 1min 5s