如何在python中的类中使用射线并行?

问题描述

我想使用ray task 方法而不是ray actor 方法来并行化类中的方法。后者的原因似乎需要更改实例化类的方式(如here所示)。下面是一个玩具代码示例,以及错误

import numpy as np
import ray


class MyClass(object):
    
    def __init__(self):
        ray.init(num_cpus=4)
    
    @ray.remote
    def func(self,x,y):
        return x * y
    
    def my_func(self):
        a = [1,2,3]
        b = np.random.normal(0,1,10000)
        result = []
        # we wish to parallelise over the array `a`
        for sub_array in np.array_split(a,3):
            result.append(self.func.remote(sub_array,b))
        return result

mc = MyClass()
mc.my_func()
>>> TypeError: missing a required argument: 'y'

由于ray似乎并不“意识到”该类而出现错误,因此它需要一个自变量self

如果我们不使用类,则代码可以正常工作

@ray.remote
def func(x,y):
    return x * y

def my_func():
    a = [1,3,4]
    b = np.random.normal(0,10000)
    result = []
    # we wish to parallelise over the list `a`
    # split `a` and send each chunk to a different processor
    for sub_array in np.array_split(a,4):
        result.append(func.remote(sub_array,b))
    return result


res = my_func()
ray.get(res)
>>> [array([-0.41929678,-0.83227786,-2.69814232,...,-0.67379119,-0.79057845,-0.06862196]),array([-0.83859356,-1.66455572,-5.39628463,-1.34758239,-1.5811569,-0.13724391]),array([-1.25789034,-2.49683358,-8.09442695,-2.02137358,-2.37173535,-0.20586587]),array([ -1.67718712,-3.32911144,-10.79256927,-2.69516478,-3.1623138,-0.27448782])]```

我们看到输出是预期的4个数组的列表。如何使用ray使MyClass进行并行处理?

解决方法

一些提示:

  1. 通常建议您仅在python中的函数或类上使用ray.remote装饰器(而不是绑定方法)。

  2. 在函数的构造函数中调用ray.init时应非常小心,因为ray.init并不是幂等的(这意味着程序将在以下情况下失败)您实例化了MyClass的多个实例。相反,您应该确保ray.init在程序中仅运行一次。

我认为使用Ray可以达到两种效果。

您可以将func从类中移出,这样它将成为函数而不是绑定方法。请注意,在这种方法中,MyClass将被序列化,这意味着funcMyClass所做的更改将不会在函数之外的任何地方反映出来。在您的简化示例中,这似乎没有问题。这种方法最容易实现最大并行度。

@ray.remote
def func(obj,x,y):
    return x * y


class MyClass(object):
    def my_func(self):
        ...
        # we wish to parallelise over the array `a`
        for sub_array in np.array_split(a,3):
            result.append(func.remote(self,sub_array,b))
        return result

您可以考虑的另一种方法是使用async actors。在这种方法中,ray actor将通过异步处理并发,但这comes with the limitations of asyncio

@ray.remote(num_cpus=4)
class MyClass(object):
    async def func(self,y):
        return x * y
    
    def my_func(self):
        a = [1,2,3]
        b = np.random.normal(0,1,10000)
        result = []
        # we wish to parallelise over the array `a`
        for sub_array in np.array_split(a,3):
            result.append(self.func.remote(sub_array,b))
        return result