问题描述
我正在使用 pypeln 在 Python 中构建流水线阶段。
我面临的问题是 pypeln 在从 stage 中提取数据时不保留插入顺序。
为了解决此问题,我为放入管道的每个元素关联了一个 task_id
,以便将管道结果与 task_id 关联回。
import pypeln as pl
from uuid import uuid4
from itertools import tee
def gen_data():
for i in xxx:
task_id = str(uuid4())
data = xxx
# ('1ddf3c4c-1ca3-4e46-af67-2b2fd9ef9df8',data),....
yield task_id,data
def func(x):
task_id,data = x
# do something
return task_id,result
# duplicate the generator,to iterate in the same insertion order on the results
first,second = tee(gen_data())
result = pl.thread.map(func,first)
# iterate on duplicated iterator to get the same task_id order
for task_id,_ in second:
for task_id2,task_res in result:
# how to consume the iterator while matching the task_id,and yield the task_res ?
我基本上需要一些 itertools
函数来
知道如何以最 Pythonic 的方式做到这一点吗?
编辑:这是我的幼稚实现:
def ordered_output(input: Iterable[str],output: Iterable[Tuple[str,Any]]):
cached_results = []
it_output = iter(output)
for expected_task_id in input:
# try in cache
found = None
for value in cached_results:
task_id,res = value
if expected_task_id == task_id:
found = res
cached_results.remove(value)
break
if found is not None:
yield found
continue
# try to find in output
while True:
value = next(it_output)
task_id,res = value
if expected_task_id == task_id:
found = res
break
else:
# put in cache
cached_results.append(value)
if found is None:
# Error,not found ??
raise stopiteration
else:
yield found
用法:
only_tasks = (task for task,_ in second)
for res in ordered_output(only_tasks,result):
print(res)
谢谢!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)