如何使用python Ray在大列表上并行化?

问题描述

我想使用ray对列表的每个元素上的函数操作进行并行化处理。下面是一个简化的代码

import numpy as np
import time

import ray
import psutil
num_cpus = psutil.cpu_count(logical=False)
ray.init(num_cpus=num_cpus)


@ray.remote
def f(a,b,c):
    return a * b - c


def g(a,c):
    return a * b - c


def my_func_par(large_list):
    # arguments a and b are constant just to illustrate
    # argument c is is each element of a list large_list
    [f.remote(1.5,2,i) for i in large_list]


def my_func_seq(large_list):
    # arguments a anf b are constant just to illustrate
    # argument c is is each element of a list large_list
    [g(1.5,i) for i in large_list]

my_list = np.arange(1,10000)

s = time.time()
my_func_par(my_list)
print(time.time() - s)
>>> 2.007

s = time.time()
my_func_seq(my_list)
print(time.time() - s)
>>> 0.0372

问题是,当我给my_func_par计时时,它比my_func_seq慢得多(如上所示,约为54倍)。 ray的一位作者的确回答了this blog评论,这似乎可以解释我正在做的是设置len(large_list)不同的任务,这是不正确的。

如何使用ray并修改上面的代码以并行运行它? (也许通过将large_list拆分为块数等于cpus的块)

编辑:该问题有两个重要条件

  • 函数f需要接受多个参数
  • 可能有必要使用ray.put(large_list),以便将larg_list变量存储在共享内存中,而不是复制到每个处理器中

解决方法

要添加到桑在上面说的话:

Ray Distributed multiprocessing.Pool支持固定大小的Ray Actors池,以便于并行化。

import numpy as np
import time

import ray
from ray.util.multiprocessing import Pool
pool = Pool()

def f(x):
    # time.sleep(1)
    return 1.5 * 2 - x

def my_func_par(large_list):
    pool.map(f,large_list)

def my_func_seq(large_list):
    [f(i) for i in large_list]

my_list = np.arange(1,10000)

s = time.time()
my_func_par(my_list)
print('Parallel time: ' + str(time.time() - s))

s = time.time()
my_func_seq(my_list)
print('Sequential time: ' + str(time.time() - s))

使用以上代码,my_func_par的运行速度更快(约0.1秒)。如果您使用该代码,并使f(x)的速度降低time.sleep之类的速度,则可以看到多处理的明显优势。

,

并行化版本较慢的原因是,运行ray任务不可避免地要运行开销(尽管要花很多精力来优化它)。这是因为并行运行事物需要进行进程间通信,序列化以及类似的事情。

话虽如此,如果您的函数真的非常快(与运行中的函数所花费的时间一样快,则它比分布式计算中的其他开销少的时间),在这种情况下,您的代码非常合适,因为函数f确实很小。运行该功能将花费不到一微秒的时间。

这意味着您应该使f函数的计算量更大,以便从并行化中受益。您提出的解决方案可能无法正常工作,因为即使那样,函数f仍然可能足够轻量,具体取决于列表的大小。