问题描述
我正在以这种方式与 Prefect 安排任务:
#Python script
from prefect import task,Flow
from prefect.tasks.shell import ShellTask
from datetime import timedelta
from datetime import datetime
from prefect.schedules import IntervalSchedule
import os
import sys
schedule = IntervalSchedule(start_date=datetime.Now() + timedelta(seconds=10),interval=timedelta(minutes=1))
can_start = True
with Flow("List files",schedule) as flow:
if can_start:
can_start = False
file_names = os.listdir("/home/admin/data/raw")
file_names = fnmatch.filter(file_names,"*fact*")
process_common.map(file_names)
can_start = True
out = flow.run()
但是如果文件在第一次 Prefect 运行后到达我的目录,则文件名在第二次运行期间以及所有下一次运行期间都保持为空。
file_names = ShellTask(command="ls /home/admin/data/raw | grep fact",return_all=True,log_stderr=True,stream_output=True)
有人知道为什么会这样吗?非常感谢您的帮助。
解决方法
这是一个常见的混淆点 - 您将构建时逻辑与运行时逻辑混为一谈(请参阅 this SO post for another example)。
您希望在运行时生效的所有逻辑都应封装为 Prefect 任务 - 在您的情况下,您可能需要使用 Prefect 的 conditional tasks 来实现您的结果,尽管您也许可以摆脱更简单的事情。
特别是,以下代码似乎具有预期的结果:
@task
def get_filenames():
file_names = os.listdir("/home/admin/data/raw")
file_names = fnmatch.filter(file_names,"*fact*")
return file_names
with Flow("List files",schedule) as flow:
process_common.map(file_names) # if the list is empty,nothing will happen
out = flow.run()
最后,请注意,您可以使用 SKIP signals 根据动态运行时条件有效地将任务标记为“已跳过”。