问题描述
在完美的工作流程中,我试图保留每个计划运行的数据。我需要比较每个以前和当前结果的数据。我尝试了 Localresult 和 checkpoint=true 但它不起作用。例如,
from prefect import Flow,task
from prefect.engine.results import LocalResult
from prefect.schedules import IntervalSchedule
from datetime import timedelta,datetime
import os
import prefect
@task("func_task_target.txt",checkpoint=True,result=LocalResult(dir="~/.prefect"))
def file_scan():
files = os.listdir(test)
#prefect.context.a = files
return files
schedule = IntervalSchedule(interval=timedelta(seconds=61))
with Flow("Test persist data",schedule) as flow:
a = file_scan()
flow.run()
我的流程安排为每 61 秒/一分钟。在第一次运行时,我可能会得到空结果,但对于第二次计划运行,我应该得到之前的流结果进行比较。任何人都可以帮助我实现这一目标吗?谢谢!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)