numpy数组的并行处理问题

问题描述

我在加快程序计算速度方面遇到问题。在我的代码的序列化python版本中,我正在计算函数f(x)的值,该函数将为NumPy数组的滑动窗口返回一个浮点数,如下所示:

a = np.array([i for i in range(1,10000000)]) # Some data here
N = 100
result = []
for i in range(N,len(a)):
    result.append(f(a[i - N:i]))

由于NumPy数组很大并且f(x)的运行时很高,因此我尝试应用多处理来加快代码速度。通过我的研究,我发现charm4py可能是一个很好的解决方案,它具有池功能,该功能可将数组拆分成块并在生成的进程之间分配工作。我已经实现了charm4py的多处理示例,然后将其翻译为我的情况:

# Split an array into subarrays for sequential processing (takes only 5 seconds)
a = np.array([a[i - N:i] for i in range(N,len(a))])
result = charm.pool.map(f,a,chunksize=512,ncores=-1)
# I'm running this code through "charmrun +p18 example.py"

我遇到的问题是,尽管代码是在功能更强大的实例(18个物理核心与6个物理核心)上执行的,但运行速度却慢得多。

我期望看到〜3倍的改进,但是没有发生。在寻找解决方案时,我了解到昂贵的反序列化/启动新流程会产生一些开销,但是我不确定是否是这种情况。

我非常感谢任何关于如何实现NumPy数组的快速并行处理的反馈或建议(假设函数f(x)没有向量化,需要花费相当长的时间计算,并且在内部产生了大量的不能并行化的特定/个人呼叫?

谢谢!

解决方法

这是基于您的代码段的示例,该代码段使用Ray并行化数组计算。

请注意,执行此操作的最佳方法取决于函数f的外观。

import numpy as np
import ray
import time

ray.init()

N = 100000

a = np.arange(10**7)
a_id = ray.put(a)

@ray.remote
def f(array,index):
    # Do processing
    time.sleep(0.2)
    return 1

result_ids = []
for i in range(len(a) // N):
    result_ids.append(f.remote(a_id,i))

results = ray.get(result_ids)
,

听起来您正在尝试将此操作与Charm或Ray并行化(尚不清楚如何将两者同时使用)。

如果选择使用Ray,并且您的数据是一个numpy数组,则可以利用zero-copy reads来避免任何反序列化开销。

您可能需要稍微优化滑动窗口功能,但看起来可能像这样:

@ray.remote
def apply_rolling(f,arr,start,end,window_size):
    results_arr = []
    for i in range(start,end - window_size):
        results_arr.append(f(arr[i : i + windows_size])
    return np.array(results_arr)

请注意,这种结构使我们可以在单个任务(也称为批处理)中多次调用f

要使用我们的功能:

# Some small setup
big_arr = np.arange(10000000)
big_arr_ref = ray.put(big_arr)

batch_size = len(big_arr) // ray.available_resources()["CPU"]
window_size = 100

# Kick off our tasks
result_refs = []
for i in range(0,big_arr,batch_size):
    end_point = min(i + batch_size,len(big_arr))
    ref = apply_rolling.remote(f,big_arr_ref,i,end_point)
    result_refs.append(ref)


# Handle the results
flattened = []
for section in ray.get(result_refs):
    flattened.extend(section)

我确定您将要自定义此代码,但是这里有两个重要且不错的属性,您可能希望对其进行维护。

批处理:我们正在利用批处理来避免启动太多任务。在任何系统中,并行化都会产生开销,因此我们始终要小心并确保不要启动太多任务。此外,我们正在计算batch_size = len(big_arr) // ray.available_resources()["CPU"]以确保我们使用与CPU数量完全相同的批次。

共享内存:由于Ray的对象存储支持从numpy数组进行零拷贝读取,因此调用ray.get或从numpy数组进行读取几乎是免费的(在没有机器的单台机器上网络费用)。不过,在串行化/调用ray.put时会有一些开销,因此该方法只调用一次put(昂贵的操作),然后调用ray.get(隐式调用)很多次。

提示:将数组作为参数直接传递到远程函数时要小心。它将多次调用ray.put,即使您传递相同的对象,也会