问题描述
我有以下循环。我简化了代码。 inner() 在类似的循环中解析同一个文件,当然没有 .remote() 调用
def outer(self,file):
rv = []
with open(file,'r') as f :
acc1,acc2 = [],[]
for i,line in enumerate(f) :
if i % 10 == 0 : print(f'> {i} ',end="\n")
if i > 25 : break
outeri,txt = line.split(':')
abc = ClassX.inner.remote(txt,file,int(outeri))
acc2.append(abc) #lst of obj-refs
acc1.append(int(outeri))
rv = [z for z in zip(acc1,ray.get([a for a in acc2])) ]
return rv
我想将数据异步收集到 rv 中,我在这里这样做,但没有中介“acc2”。
我有两个问题:
试图理解并行迭代器,但似乎很难/不可行,如何在 readline 之后压缩步骤直到 .remote() 调用
解决方法
编辑:根据说明更改答案。
为了按照对象引用到达的顺序处理对象引用,您需要使用 ray.wait
在对象引用返回时获取对象引用,然后仅在对象引用上调用 ray.get
准备好了。
def outer(file):
outer_is = {}
unfinished_refs = []
with open(file,"r"):
for i,line in enumerate(f):
outeri,txt = line.split(":")
ref = ClassX.inner.remote(txt,file,int(outeri))
outer_is[ref] = int(outeri)
unfinished_refs.append(ref)
# This part will get and process tasks as they finish
while len(unfinished_refs) > 0:
finished,unfinished_refs = ray.wait(unfinished_refs)
outeri = outer_is[finished[0]]
result = ray.get(finished[0])
### Process the result here ###