ray如何处理封闭范围的变量?

问题描述

请考虑以下示例:

import numpy as np
import ray
import time

A = np.array([42] * 4200)   

@ray.remote
def foo1(x):
    x[5]**2

@ray.remote
def foo2(j):
    A[j]**2

ray.init()

#
# some warmup for ray
#

start = time.perf_counter()
for _ in range(1000):
    ray.get(foo1.remote(A))
time_foo1 = time.perf_counter() - start

start = time.perf_counter()
for _ in range(1000):
    ray.get(foo2.remote(5))
time_foo2 = time.perf_counter() - start

print(time_foo1,time_foo2)

ray.shutdown()

看来time_foo2time_foo1小得多。我天真的猜测是每次调用A时ray都会序列化foo1。但是,即使我手动将A放在对象存储中并将对象引用传递给foo1,也看不到任何改进。有人可以启发我幕后的情况吗?

解决方法

运行您的代码时,我得到0.8745803280000004 0.672677727。因此foo2较小,但不是很多(也许A在您的原始代码中较大?)。话虽如此,这是关于射线在做什么的解释。

使用ray.remote注释函数时,会将其序列化,以便可以将其发送到远程进程以运行。 Ray使用cloudpickle进行序列化。当一个函数被序列化时,其全局依赖性也被序列化。

在下面的示例中,A是对必须序列化的全局变量的依赖关系的示例。

@ray.remote
def foo2(j):
    A[j]**2

调用远程函数时,Ray必须将参数传递给远程函数。有一些针对小型对象的优化,但针对大型对象的逻辑大致如下:

for each arg:
    if arg is an ObjectRef,do nothing
    else,replace arg with ray.put(arg)

在远程工作者上,当调用远程函数时,在实际调用函数之前,我们在所有ObjectRef上调用ray.get(同样,我们只关注大型对象)。 ray.get可以从诸如缓存或零拷贝读取之类的优化中受益,因此它通常比ray.put便宜很多。

实际上,这意味着下面的代码

@ray.remote
def foo(arg):
    # At this point ray.get(arg_ref) has already happened

A = np.arange(1_000_000)
foo.remote(A) # This is the same as foo.remote(ray.put(A))
foo.remote(A) # Same as foo.remote(ray.put(A)),which means it has happened twice now

如果我们显式调用ray.put,我们可以将自己保存为put

A_ref = np.put(A) 
foo.remote(A_ref) # ray.put is not called here
foo.remote(A_ref) # again,ray.put is not called

使用A的一百万个输入矩阵运行这些示例时,我得到以下次数(here's my sample code):

Time putting A every time: 3.041259899
Time passing ref of A: 0.7547513060000002
Time serializing A in function: 0.7694220469999999

请注意,虽然序列化A的速度很快,但是这是一种不好的做法,因此不建议这样做。这是因为对象放在对象存储中,而序列化的函数放在控件存储中,而控件存储不是为传递大量数据而构建的。