在完美任务“批处理”中使用迭代器

问题描述

我正在使用 prefect 并定义一个 flow 来插入带有 cosmos db 的文档。

问题在于 query_items() 调用是可迭代的,对于大型容器,无法将所有条目保存在内存中。

我相信我的问题可以简化为:

  • given an iterator,how can I create batches to be processed (mapped) in a prefect flow?

示例:

def big_iterable_function_i_cannot_change():
    yield from range(1000000) # some large amount of work

@task
def some_prefect_batching_magic(x):
    # magic code here
    pass


with Flow("needs-to-be-batched"):
    some_prefect_batching_magic.map(big_iterable_function_i_cannot_change())

上面的代码或类似的代码会给我一个错误

prefect.FlowRunner | Flow run Failed: some reference tasks Failed.

解决方法

您收到此错误是因为您没有将 big_iterable_function_i_cannot_change 定义为 taskprefect 实际上并不直接执行 flowflow 用于制作 schedule,(用 dask 的说法)——然后用于执行流程,(据我所知)。 prefect 中的并行化仅在与 dask executor 一起使用时发生。

这是我对您的 flow 的看法。但是,如果您无法将 big_iterable_function_i_cannot_change 的任务装饰器添加到 task 中,请将其包装在任务中。最后 - 不确定您是否可以将生成器传递给映射任务。

import prefect
from prefect import Flow,Parameter,task

@task
def big_iterable_function_i_cannot_change():
    return range(5) # some large amount of work

@task
def some_prefect_batching_magic(x):
    # magic code here
    pass


with Flow("needs-to-be-batched") as flow:
    itter_res = big_iterable_function_i_cannot_change()
    post_process_res = some_prefect_batching_magic.map(itter_res)

flow.visualize()
state = flow.run()


flow.visualize(flow_state=state)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...