带有 pypeln 地图阶段结果的有序迭代器

问题描述

我正在使用 pypeln 在 Python 中构建流水线阶段。

我面临的问题是 pypeln 在从 stage 中提取数据时不保留插入顺序。

为了解决此问题,我为放入管道的每个元素关联了一个 task_id,以便将管道结果与 task_id 关联回。

import pypeln as pl
from uuid import uuid4
from itertools import tee

def gen_data():
    for i in xxx:
        task_id = str(uuid4())
        data = xxx
        # ('1ddf3c4c-1ca3-4e46-af67-2b2fd9ef9df8',data),....
        yield task_id,data

def func(x):
    task_id,data = x
    # do something
    return task_id,result

# duplicate the generator,to iterate in the same insertion order on the results
first,second = tee(gen_data())
result = pl.thread.map(func,first)

# iterate on duplicated iterator to get the same task_id order
for task_id,_ in second:
    for task_id2,task_res in result:
        # how to consume the iterator while matching the task_id,and yield the task_res ? 
    

我基本上需要一些 itertools 函数

  1. 从匹配谓词的结果迭代器中取出第一个元素并产生它
  2. 将不匹配的消耗迭代器元素保留在缓存中
  3. 在检查结果迭代器之前检查缓存
  4. 当在缓存中找到元素时,将其从缓存中删除

知道如何以最 Pythonic 的方式做到这一点吗?

编辑:这是我的幼稚实现:

    def ordered_output(input: Iterable[str],output: Iterable[Tuple[str,Any]]):
        cached_results = []
        it_output = iter(output)
        for expected_task_id in input:
            # try in cache
            found = None
            for value in cached_results:
                task_id,res = value
                if expected_task_id == task_id:
                    found = res
                    cached_results.remove(value)
                    break
            if found is not None:
                yield found
                continue
            # try to find in output
            while True:
                value = next(it_output)
                task_id,res = value
                if expected_task_id == task_id:
                    found = res
                    break
                else:
                    # put in cache
                    cached_results.append(value)
            if found is None:
                # Error,not found ??
                raise stopiteration
            else:
                yield found

用法


only_tasks = (task for task,_ in second)
for res in ordered_output(only_tasks,result):
    print(res)

谢谢!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)