Ray:遍历列表,并在集群中添加其他参数

问题描述

我有一个遍历列表的函数

for id_ in id_list:
    scrape(id_,str(id_.split('/')[-1]),id_list.index(id_),len(id_list),target_path)

我可以使用以下命令并行运行它:

for id_ in id_list:
        p1 = Process(target=scrape,args=(id_,target_path))
        p1.start()

但是,我想使用Ray并将其在整个集群中并行化。似乎很难让它遍历Ray中的列表-我该怎么做?

解决方法

我不使用Ray,也不知道您的问题是什么,因为您没有显示Ray代码和完整的错误消息,但是如果我必须使用{{1} }进行并行处理,然后我必须首先使用multiprocessing.Pool()循环创建带有所有参数的列表

for

然后在没有all_args = [] for id_ in id_list: all_args.append( (id_,str(id_.split('/')[-1]),id_list.index(id_),len(id_list),target_path)) 循环的情况下运行它

for

最终,我将不得不使用列表理解才能在一行中完成

p = multiprocessing.Pool(10)
p.starmap(target=scrape,args=all_args)

我希望p = multiprocessing.Pool(10) p.starmap(target=scrape,args=((id_,id_list.index(id_) for id_ in id_lis)) 可能需要相同的方法。


编辑:

示例代码为Ray

Ray

更简单

import ray

ray.init()

@ray.remote
def scrape(id_,text,index_,length,path):
    return 'id: {} | text: {} | index: {} | len: {} | path: {}'.format(id_,path)

id_list = ['2020/1','2020/2','2020/3','2020/4']
target_path = '/home/user'

all_args = []

for id_ in id_list:
     all_args.append( (id_,target_path))

futures = [scrape.remote(*args) for args in all_args]
all_results = ray.get(futures)

print(all_results)