如何使用射线在函数的并行实例之间使用共享变量

问题描述

我有一个名为 temp共享变量,它应该由名为 do()函数的各种实例不断读取和操作。特别是,使用 ray 模块,我将 do() 装饰如下。

@ray.remote
def do(temp):
    prob1,prob2 = compute_probability(tau1,tau2)
    selected_path = select_path(prob1,prob2)
    if selected_path == 1:
        temp += 1
    update_accumulation(selected_path)

然后,在我的主循环中,我只调用 do() 的所有实例,例如:

temp_id = ray.put(temp)
ray.get([do.remote(temp_id) for _ in range(N)])

但是,在下面的工作代码中,每次循环迭代中temp的值总是0一个人可以指出我的错误所在吗?

import random
import matplotlib.pyplot as plt
import ray

N = 500
l1 = 1
l2 = 2
ru = 0.5
Q = 1
tau1 = 0.5
tau2 = 0.5

epochs = 150

success = [0 for x in range(epochs)]

def compute_probability(tau1,tau2):
    return tau1/(tau1 + tau2),tau2/(tau1 + tau2)

def select_path(prob1,prob2):
    return random.choices([1,2],weights=[prob1,prob2])[0]

def update_accumulation(link_id):
    global tau1
    global tau2
    if link_id == 1:
        tau1 += Q / l1
        return tau1
    if link_id == 2:
        tau2 += Q / l2
        return tau2

def update_evapuration():
    global tau1
    global tau2
    tau1 *= (1-ru)
    tau2 *= (1-ru)
    return tau1,tau2

def report_results(success):
    plt.plot(success)
    plt.show()

ray.init()

@ray.remote
def do(temp):
    prob1,prob2)
    if selected_path == 1:
        temp += 1
    update_accumulation(selected_path)

for epoch in range(epochs-1):
    temp = 0
    temp_id = ray.put(temp)
    ray.get([do.remote(temp_id) for _ in range(N)])
    update_evapuration()
    success[epoch] = temp

report_results(success) 

解决方法

Ray 依赖于 cloudpickle 的补丁版本来pickle 函数。这意味着全局变量与函数一起序列化,因此不会在函数调用之间共享。

处理此问题的推荐方法是使用 Actor。 See this post for more details.