问题描述
for id_ in id_list:
scrape(id_,str(id_.split('/')[-1]),id_list.index(id_),len(id_list),target_path)
我可以使用以下命令并行运行它:
for id_ in id_list:
p1 = Process(target=scrape,args=(id_,target_path))
p1.start()
但是,我想使用Ray并将其在整个集群中并行化。似乎很难让它遍历Ray中的列表-我该怎么做?
解决方法
我不使用Ray
,也不知道您的问题是什么,因为您没有显示Ray
代码和完整的错误消息,但是如果我必须使用{{1} }进行并行处理,然后我必须首先使用multiprocessing.Pool()
循环创建带有所有参数的列表
for
然后在没有all_args = []
for id_ in id_list:
all_args.append( (id_,str(id_.split('/')[-1]),id_list.index(id_),len(id_list),target_path))
循环的情况下运行它
for
最终,我将不得不使用列表理解才能在一行中完成
p = multiprocessing.Pool(10)
p.starmap(target=scrape,args=all_args)
我希望p = multiprocessing.Pool(10)
p.starmap(target=scrape,args=((id_,id_list.index(id_) for id_ in id_lis))
可能需要相同的方法。
编辑:
示例代码为Ray
Ray
更简单
import ray
ray.init()
@ray.remote
def scrape(id_,text,index_,length,path):
return 'id: {} | text: {} | index: {} | len: {} | path: {}'.format(id_,path)
id_list = ['2020/1','2020/2','2020/3','2020/4']
target_path = '/home/user'
all_args = []
for id_ in id_list:
all_args.append( (id_,target_path))
futures = [scrape.remote(*args) for args in all_args]
all_results = ray.get(futures)
print(all_results)