如何从ray中的对象存储中清除对象?

问题描述

我正在试用很有前途的多处理包 ray我有一个我似乎无法解决的问题。我的程序第一次运行良好,但在第二次运行时在 ray.put() 行上引发此异常:

ObjectStoreFullError: Failed to put object ffffffffffffffffffffffffffffffffffffffff010000000c000000 in object store because it is full. Object size is 2151680255 bytes.
The local object store is full of objects that are still in scope and cannot be evicted. Tip: Use the `ray memory` command to list active objects in the cluster.

我想做什么: 在我的实际代码(我打算编写)中,我需要按顺序处理许多 big_data_objects。我想一次在内存中保存一个 big_data_object 并对大数据对象进行多次繁重(独立)计算。我想并行执行这些计算。完成这些后,我必须将对象存储中的这些 big_data_object 替换为新的并重新开始计算(并行)。

使用我的测试脚本,我通过在没有 ray.shutdown() 的情况下再次启动脚本来模拟这一点。如果我使用 ray 关闭 ray.shutdown(),对象存储会被清除,但重新初始化需要很长时间,而且我无法按照我的意愿顺序处理多个 big_data_object

我研究了哪些信息来源: 我研究了这个文档 Ray Design Patterns 并研究了“反模式:大/不可序列化对象的闭包捕获”部分以及正确的模式应该是什么样子。我还研究了导致以下测试脚本的入门指南。

重现问题的最小示例: 我创建了一个测试脚本来测试这个。是这样的:

#%% Imports
import ray
import time
import psutil
import numpy as np


#%% Testing ray
# Start Ray
num_cpus = psutil.cpu_count(logical=False)
if not ray.is_initialized():
    ray.init(num_cpus=num_cpus,include_dashboard=False)

# Define function to do work in parallel
@ray.remote
def my_function(x):  # Later I will have multiple (different) my_functions to extract different feature from my big_data_object
    time.sleep(1)
    data_item = ray.get(big_data_object_ref)
    return data_item[0,0]+x

# Define large data
big_data_object = np.random.rand(16400,16400)  # Define an object of approx 2 GB. Works on my machine (16 GB RAM)
# big_data_object = np.random.rand(11600,11600)  # Define an object of approx 1 GB.
# big_data_object = np.random.rand(8100,8100)  # Define an object of approx 500 MB.
# big_data_object = np.random.rand(5000,5000)  # Define an object of approx 190 MB.
big_data_object_ref = ray.put(big_data_object)

# Start 4 tasks in parallel.
result_refs = []
# for item in data:
for item in range(4):
    result_refs.append(my_function.remote(item))
    
# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores,this will take 1 second.
results = ray.get(result_refs)
print("Results: {}".format(results))


#%% Clean-up object store data - Still their is a (huge) memory leak in the object store.
for index in range(4):
    del result_refs[0]

del big_data_object_ref

我认为哪里出错了: 我想我删除了脚本末尾对对象存储的所有引用。因此,应该从对象存储中清除对象(如 here 所述)。显然,出现了问题,因为 big_data_object 保留在对象存储中。然而,results 从对象存储中删除就好了。

一些调试信息: 我使用 ray memory 命令检查了对象存储,这是我得到的:

(c:\python\cenv38rl) PS C:\WINDOWS\system32> ray memory
---------------------------------------------------------------------------------------------------------------------
 Object ID                                                Reference Type       Object Size   Reference Creation Site
=====================================================================================================================
; worker pid=20952
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=29368
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=17388
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=24208
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=27684
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=6860
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; driver pid=28684
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\worker.py:put_object:277 | c:\python\cenv38rl\lib\site-packages\ray\worker.py:put:1489 | c:\python\cenv38rl\lib\site-packages\ray\_private\client_mode_hook.py:wrapper:47 | C:\Users\Stefan\Documents\Python examples\Multiprocess_Ray3_SO.py:<module>:42
---------------------------------------------------------------------------------------------------------------------

--- Aggregate object store stats across all nodes ---
plasma memory usage 2052 MiB,1 objects,77.41% full

我尝试过的一些事情: 如果,我将 my_function 替换为:

@ray.remote
def my_function(x):  # Later I will have multiple different my_functions to extract separate feature from my big_data_objects
    time.sleep(1)
    # data_item = ray.get(big_data_object_ref)
    # return data_item[0,0]+x
    return 5

然后脚本成功清除了对象存储,但 my_function 无法使用我需要的 big_data_object

我的问题是:如何修复我的代码,以便在脚本结束时从对象存储中删除 big_data_object 而不关闭 ray?

注意:我使用 pip install ray 安装了 ray,它给了我现在使用的版本 ray==1.2.0。我在 Windows 上使用 ray,并在 Spyder v4.2.5(实际上是 miniconda)环境中的 conda 中开发,以防万一。

编辑: 我也在具有 8GB RAM 的 Ubuntu 机器上进行了测试。为此,我使用了 1GB 的 big_data_object。 我可以确认这个问题也出现在这台机器上。

ray memory 输出

(SO_ray) stefan@stefan-HP-ZBook-15:~/Documents/Ray_test_scripts$ ray memory 
---------------------------------------------------------------------------------------------------------------------
 Object ID                                                Reference Type       Object Size   Reference Creation Site
=====================================================================================================================
; worker pid=18593
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/function_manager.py:fetch_and_register_remote_function:180 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_process_key:140 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_run:87 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/threading.py:run:870
; worker pid=18591
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/function_manager.py:fetch_and_register_remote_function:180 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_process_key:140 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_run:87 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/threading.py:run:870
; worker pid=18590
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/function_manager.py:fetch_and_register_remote_function:180 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_process_key:140 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_run:87 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/threading.py:run:870
; driver pid=17712
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   (put object)  | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/_private/client_mode_hook.py:wrapper:47 | /home/stefan/Documents/Ray_test_scripts/Multiprocess_Ray3_SO.py:<module>:43 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/spyder_kernels/customize/spydercustomize.py:exec_code:453
; worker pid=18592
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/function_manager.py:fetch_and_register_remote_function:180 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_process_key:140 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_run:87 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/threading.py:run:870
---------------------------------------------------------------------------------------------------------------------

--- Aggregate object store stats across all nodes ---
plasma memory usage 1026 MiB,99.69% full

我必须在 Spyder 中运行程序,以便在执行程序后我可以使用 ray memory 检查对象存储的内存。例如,如果我在 PyCharm 中运行程序,ray 会在脚本完成时自动终止,因此我无法检查我的脚本是否按预期清除了对象存储。

解决方法

问题在于您的远程函数捕获了 big_data_object_ref ,并且永远不会删除来自那里的引用。请注意,当您执行此类操作时:

# Define function to do work in parallel
@ray.remote
def my_function(x):  # Later I will have multiple (different) my_functions to extract different feature from my big_data_object
    time.sleep(1)
    data_item = ray.get(big_data_object_ref)
    return data_item[0,0]+x

# Define large data
big_data_object = np.random.rand(16400,16400)
big_data_object_ref = ray.put(big_data_object)

big_data_object_ref 被序列化为远程函数定义。因此,在您删除此序列化函数定义(位于 ray 内部)之前,将有一个永久指针。

改为使用这种类型的模式:

#%% Imports
import ray
import time
import psutil
import numpy as np


#%% Testing ray
# Start Ray
num_cpus = psutil.cpu_count(logical=False)
if not ray.is_initialized():
    ray.init(num_cpus=num_cpus,include_dashboard=False)

# Define function to do work in parallel
@ray.remote
def my_function(big_data_object,x):
    time.sleep(1)
    return big_data_object[0,0]+x

# Define large data
#big_data_object = np.random.rand(16400,16400)  # Define an object of approx 2 GB. Works on my machine (16 GB RAM)
# big_data_object = np.random.rand(11600,11600)  # Define an object of approx 1 GB.
big_data_object = np.random.rand(8100,8100)  # Define an object of approx 500 MB.
# big_data_object = np.random.rand(5000,5000)  # Define an object of approx 190 MB.
big_data_object_ref = ray.put(big_data_object)
print("ref in a driver ",big_data_object_ref)

# Start 4 tasks in parallel.
result_refs = []
# for item in data:
for item in range(4):
    result_refs.append(my_function.remote(big_data_object_ref,item))
    
# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores,this will take 1 second.
results = ray.get(result_refs)
print("Results: {}".format(results))
print(result_refs)

#%% Clean-up object store data - Still their is a (huge) memory leak in the object store.
#for index in range(4):
#    del result_refs[0]
del result_refs

del big_data_object_ref
import time
time.sleep(1000)

不同之处在于现在我们将 big_data_object_ref 作为参数传递给远程函数,而不是在远程函数中捕获它。

注意:当一个对象引用被传递给一个远程函数时,它们会被自动解除引用。所以不需要在远程函数中使用 ray.get()。如果您想在远程函数内显式调用 ray.get(),请将列表或字典内的对象引用作为参数传递给远程函数。在这种情况下,您会得到类似的信息:

# Remote function
@ray.remote
def my_function(big_data_object_ref_list,x):
    time.sleep(1)
    big_data_object = ray.get(big_data_object_ref_list[0])
    return big_data_object[0,0]+x

# Calling the remote function
my_function.remote([big_data_object_ref],item)

注意 2:您使用 Spyder,它使用 IPython 控制台。 rayIPython 控制台之间目前存在一些已知问题。只需确保删除脚本中的引用,而不是使用直接输入到 IPython 控制台的命令(因为这样会删除引用,但不会从对象存储中删除项目)。要在脚本运行时使用 ray memory 命令检查对象存储,您可以在脚本末尾使用一些代码,例如:

#%% Testing ray
# ... my ray testing code

#%% Clean-up object store data
print("Wait 10 sec BEFORE deletion")
time.sleep(10)  # Now quickly use the 'ray memory' command to inspect the contents of the object store.

del result_refs
del big_data_object_ref

print("Wait 10 sec AFTER deletion")
time.sleep(10)  # Now again use the 'ray memory' command to inspect the contents of the object store.