在完美任务“批处理”中使用迭代器

问题描述

我正在使用 prefect 并定义一个 flow 来插入带有 cosmos db 的文档。

问题在于 query_items() 调用是可迭代的,对于大型容器,无法将所有条目保存在内存中。

我相信我的问题可以简化为:

  • given an iterator,how can I create batches to be processed (mapped) in a prefect flow?

示例:

def big_iterable_function_i_cannot_change():
    yield from range(1000000) # some large amount of work

@task
def some_prefect_batching_magic(x):
    # magic code here
    pass


with Flow("needs-to-be-batched"):
    some_prefect_batching_magic.map(big_iterable_function_i_cannot_change())

上面的代码或类似的代码会给我一个错误:

prefect.FlowRunner | Flow run FAILED: some reference tasks failed.

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)