问题描述
在Prefect中,假设我有一些管道可以对列表中的每个日期运行f(date),并将其保存到文件中。这是非常常见的ETL操作。在气流中,如果我运行一次,它将回填所有历史日期。如果我再次运行它,它将知道该任务已运行,并且仅运行出现的任何新任务(即最新日期)。
据我所知,Prefect每天都会运行整个管道,即使前一天完成了99%的任务。在不切换到Prefect Cloud的情况下有哪些解决方案?您是否只是做一些事情,例如让每个任务在退出前在redis中缓存其完成?
解决方法
Prefect具有许多处理缓存的一流方法,具体取决于所需的控制量。对于每个任务,您可以指定是否应缓存结果,应缓存结果多长时间以及应如何使缓存无效(年龄,任务的不同输入,流参数值等)。
缓存任务的最简单方法是使用targets,它使您可以指定任务具有可模板化的副作用(通常是本地或Cloud存储中的文件,但也可以是数据库条目,redis键或其他任何内容)。在运行任务之前,它将检查是否存在副作用,如果存在,则跳过运行。
例如,此任务会将其结果写入自动以任务名称和当前日期为模板的本地文件:
@task(result=LocalResult(),target="{task_name}-{today}")
def get_data():
return [1,2,3,4,5]
只要存在匹配的文件,该任务就不会重新运行。由于{today}
是目标名称的一部分,因此将隐式地将任务的值缓存一天。您还可以在模板中使用一个参数(例如回填日期)来复制Airflow的行为。
要获得更多控制权,可以通过在任何任务上设置cache_for
,cache_validator
和cache_key
来使用Prefect的full cache mechanism。如果设置,任务将以Cached
状态而不是Success
状态完成。与适当的编排后端(如Prefect Server或Prefect Cloud)配对时,可以通过以后运行同一任务(或具有相同Cached
的任何任务)来查询cache_key
状态。将来的任务将返回Cached
状态作为其自己的结果。