问题描述
我正在使用 prefect 并定义一个 flow 来插入带有 cosmos db 的文档。
问题在于 query_items() 调用是可迭代的,对于大型容器,无法将所有条目保存在内存中。
我相信我的问题可以简化为:
given an iterator,how can I create batches to be processed (mapped) in a prefect flow?
示例:
def big_iterable_function_i_cannot_change():
yield from range(1000000) # some large amount of work
@task
def some_prefect_batching_magic(x):
# magic code here
pass
with Flow("needs-to-be-batched"):
some_prefect_batching_magic.map(big_iterable_function_i_cannot_change())
上面的代码或类似的代码会给我一个错误:
prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)