问题描述
我在加快程序计算速度方面遇到问题。在我的代码的序列化python版本中,我正在计算函数f(x)的值,该函数将为NumPy数组的滑动窗口返回一个浮点数,如下所示:
a = np.array([i for i in range(1,10000000)]) # Some data here
N = 100
result = []
for i in range(N,len(a)):
result.append(f(a[i - N:i]))
由于NumPy数组很大并且f(x)的运行时很高,因此我尝试应用多处理来加快代码速度。通过我的研究,我发现charm4py可能是一个很好的解决方案,它具有池功能,该功能可将数组拆分成块并在生成的进程之间分配工作。我已经实现了charm4py的多处理示例,然后将其翻译为我的情况:
# Split an array into subarrays for sequential processing (takes only 5 seconds)
a = np.array([a[i - N:i] for i in range(N,len(a))])
result = charm.pool.map(f,a,chunksize=512,ncores=-1)
# I'm running this code through "charmrun +p18 example.py"
我遇到的问题是,尽管代码是在功能更强大的实例(18个物理核心与6个物理核心)上执行的,但运行速度却慢得多。
我期望看到〜3倍的改进,但是没有发生。在寻找解决方案时,我了解到昂贵的反序列化/启动新流程会产生一些开销,但是我不确定是否是这种情况。
我非常感谢任何关于如何实现NumPy数组的快速并行处理的反馈或建议(假设函数f(x)没有向量化,需要花费相当长的时间计算,并且在内部产生了大量的不能并行化的特定/个人呼叫?
谢谢!
解决方法
这是基于您的代码段的示例,该代码段使用Ray并行化数组计算。
请注意,执行此操作的最佳方法取决于函数f
的外观。
import numpy as np
import ray
import time
ray.init()
N = 100000
a = np.arange(10**7)
a_id = ray.put(a)
@ray.remote
def f(array,index):
# Do processing
time.sleep(0.2)
return 1
result_ids = []
for i in range(len(a) // N):
result_ids.append(f.remote(a_id,i))
results = ray.get(result_ids)
,
听起来您正在尝试将此操作与Charm或Ray并行化(尚不清楚如何将两者同时使用)。
如果选择使用Ray,并且您的数据是一个numpy数组,则可以利用zero-copy reads来避免任何反序列化开销。
您可能需要稍微优化滑动窗口功能,但看起来可能像这样:
@ray.remote
def apply_rolling(f,arr,start,end,window_size):
results_arr = []
for i in range(start,end - window_size):
results_arr.append(f(arr[i : i + windows_size])
return np.array(results_arr)
请注意,这种结构使我们可以在单个任务(也称为批处理)中多次调用f
。
要使用我们的功能:
# Some small setup
big_arr = np.arange(10000000)
big_arr_ref = ray.put(big_arr)
batch_size = len(big_arr) // ray.available_resources()["CPU"]
window_size = 100
# Kick off our tasks
result_refs = []
for i in range(0,big_arr,batch_size):
end_point = min(i + batch_size,len(big_arr))
ref = apply_rolling.remote(f,big_arr_ref,i,end_point)
result_refs.append(ref)
# Handle the results
flattened = []
for section in ray.get(result_refs):
flattened.extend(section)
我确定您将要自定义此代码,但是这里有两个重要且不错的属性,您可能希望对其进行维护。
批处理:我们正在利用批处理来避免启动太多任务。在任何系统中,并行化都会产生开销,因此我们始终要小心并确保不要启动太多任务。此外,我们正在计算batch_size = len(big_arr) // ray.available_resources()["CPU"]
以确保我们使用与CPU数量完全相同的批次。
共享内存:由于Ray的对象存储支持从numpy数组进行零拷贝读取,因此调用ray.get
或从numpy数组进行读取几乎是免费的(在没有机器的单台机器上网络费用)。不过,在串行化/调用ray.put
时会有一些开销,因此该方法只调用一次put
(昂贵的操作),然后调用ray.get
(隐式调用)很多次。
提示:将数组作为参数直接传递到远程函数时要小心。它将多次调用ray.put
,即使您传递相同的对象,也会 。