问题描述
我有一个流程,其中我使用.map()
;这样,我“循环”了多个输入,但是我只需要生成一次某些输入,但是我注意到我的流程不断重新生成它们。
是否可以在运行期间缓存/检查任务的结果(用于其他任务)?
我的理解是,可以像这样缓存特定时间:
import datetime
from prefect import task
@task(cache_for=datetime.timedelta(hours=1))
def some_task():
...
但是,如果运行时间少于cache_for
,则高速缓存是否仍将保留用于下一次运行(如果不是,我认为长时间的高速缓存将起作用)。
解决方法
是的,有几种不同的方法可以实现这种缓存:
使用其他缓存验证器
除了配置缓存过期时间(如上所述)之外,您还可以选择配置cache validator。就您而言,您可以使用输入或参数验证器。
使用缓存键
您可以通过在任务上指定cache_key
来“共享”任务之间的缓存(在单个Flow内和跨Flow):
@task(cache_for=datetime.timedelta(hours=1),cache_key="my-key")
def some_task():
...
这将随后通过键而不是任务ID查找候选人Cached
的状态。
使用基于文件的目标
最后,越来越流行的设置是使用file-based target
for your task。然后,您可以使用诸如flow_run_id
之类的东西以及提供给任务的输入来模板化此目标字符串。每当任务运行时,它都会首先检查指定目标位置是否存在数据,如果找到,则不会重新运行。例如:
@task(target="{flow_run_id}/{scheduled_start_time:%Y-%d-%m}/results.bytes")
def some_task():
...
如果同时满足以下两个条件,则此模板具有在目标处重用数据的作用:
- 任务将在同一天重新运行
- 该任务作为同一流程运行的一部分重新运行
然后,您可以跨多个任务(或在您的情况下,跨所有映射的子级)共享此模板。
请注意,您也可以根据需要向target
模板提供输入和参数。