问题描述
我有一个DAG,每当FileSensor
检测到文件时,它就会为每个文件生成任务,以(1)将文件移到暂存区,(2)触发一个单独的DAG来处理文件。
FileSensor -> Move(File1) -> TriggerDAG(File1) -> Done
|-> Move(File2) -> TriggerDAG(File2) -^
在DAG定义文件中,通过遍历FileSensor所监视的目录来生成中间任务,如下所示:
# def generate_move_task(f: Path) -> BashOperator
# def generate_dag_trigger(f: Path) -> TriggerDagRunoperator
with dag:
for filepath in Path(WATCH_DIR).glob(*):
sensor_task >> generate_move_task(filepath) >> generate_dag_trigger(filepath)
Move
任务将移动导致任务生成的文件,因此下一次DAG运行将不会FileSensor
或Move
任务重新触发TriggerDAG
该文件。 实际上,调度程序根本不会为该文件生成任务,因为在所有文件都通过Move
之后,输入目录中不再有要迭代的内容。。 >
这引起了两个问题:
- 执行后,任务日志和渲染不再可用。“图形视图”仅显示DAG的当前状态(空),而不显示运行时的状态。 (树状视图显示任务的运行和状态,但是单击“正方形”并选择任何详细信息会导致气流错误。)
- 由于争用情况,下游任务可能会陷入困境。第一个任务是将原始文件移动到暂存区。如果这花费的时间比调度程序的轮询时间更长,则调度程序将不再收集下游
TriggerDAG(File1)
任务,这意味着即使上游任务成功运行,也未计划执行该任务。好像下游任务永远不存在。
通过将任务序列更改为copy(File1) -> TriggerDAG(File1) -> Remove(File1)
可以解决竞态条件问题,但更广泛的问题仍然存在:是否可以持久保存动态生成的任务,或者至少可以通过以下方式持续访问它们:气流接口?
解决方法
虽然不清楚,但我假设您不是通过协调器DAG触发的下游DAG不是为每个文件动态生成的(例如Move&TriggerDAG任务);换句话说,不同于您的“移动”任务不断出现和消失(基于文件),下游DAG是静态的,并且始终停留在其中
您已经建立了一个相对复杂的工作流,该工作流可以执行高级任务,例如动态生成任务和触发外部DAG。我认为,只要对DAG的结构稍加修改,就可以摆脱麻烦(这也是IMO的高级功能)
- 将
Move
任务从上游 orchestrator DAG移到下游(按文件) process DAG - 让上游 orchestrator DAG做两件事
- 感应/等待文件出现
- 对于每个文件,触发下游的处理 DAG(实际上您已经在执行)。
对于Orchestrator DAG,您可以选择以下任一方式
- 只有一个任务,即文件感知+触发每个文件的下游DAG
- 有两个任务(我希望这样做)
- 第一个任务感知文件,并在文件出现时将其列表发布到XCOM
- 第二个任务读取该XCOM和foreach文件,并触发其对应的DAG
但是无论选择哪种方式,都必须从中复制相关的代码段
-
FileSensor
(以便能够感知文件,然后在XCOM
中发布其名称)和 -
TriggerDagRunOperator
(以便能够通过单个任务触发多个DAG)
这是描述两种任务方法的图
,对于标题问题的简短回答是,从Airflow 1.10.11开始,不,这似乎无法实现。为了呈现DAG /任务详细信息,Airflow网络服务器始终会按照当前定义并收集到DagBag
的DAG和任务进行查询。如果定义更改或消失,那么运气不好。仪表板仅在表中显示日志条目。它不会探查日志中是否存在先前的逻辑(似乎也没有存储除标题以外的大部分逻辑)。
y2k-shubham provides an excellent solution to the unspoken question of "how can I write DAGs/tasks so that the transient metadata are accessible"。他的解决方案的潜台词是:在每次任务运行时将瞬态元数据转换为Airflow存储的内容,但保持任务本身固定不变。 XCom是他在这里使用的解决方案,它确实显示在任务实例的详细信息/日志中。
Airflow能否实现对短暂的一次性任务的持久接口访问,这些一次性任务的定义从DagBag
中消失了?有可能但不太可能,原因有两个:
- 在呈现仪表板时,这将需要Web服务器探查历史日志,而不仅仅是当前的
DagBag
,这将需要额外的基础架构来保持Web界面的敏捷性,并使显示非常混乱。 li>
- As y2k-shubham notes in a comment to another question of mine,出逃和更改任务/ DAG是Airflow的反模式。我想这会使下一个功能很难卖出去。