问题描述
阅读 Output Caching based on a file target 上的文档后,我认为这个工作流应该是输出缓存的一个例子:
from time import sleep
from prefect import Flow,task
from prefect.engine.results import LocalResult
@task(target="func_task_target.txt",checkpoint=True,result=LocalResult(dir="~/.prefect"))
def func_task():
sleep(5)
return 99
with Flow("Test-cache") as flow:
func_task()
if __name__ == '__main__':
flow.run()
我希望 func_task
运行一次,得到缓存,然后在下次运行流时使用缓存的值。但是,似乎每次都运行 func_task
。
我哪里出错了?还是我误解了文档?
解决方法
尝试将环境变量 PREFECT__FLOWS__CHECKPOINTING
设置为 True
import os
os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "True"
您也可以更改结果目录
os.environ["PREFECT__HOME_DIR"] = "path to dir"