Airflow是否可以持久访问短期动态生成任务的元数据?

问题描述

我有一个DAG,每当FileSensor检测到文件时,它就会为每个文件生成任务,以(1)将文件移到暂存区,(2)触发一个单独的DAG来处理文件

FileSensor -> Move(File1) -> TriggerDAG(File1) -> Done
          |-> Move(File2) -> TriggerDAG(File2) -^

在DAG定义文件中,通过遍历FileSensor所监视的目录来生成中间任务,如下所示:

# def generate_move_task(f: Path) -> BashOperator
# def generate_dag_trigger(f: Path) -> TriggerDagRunoperator

with dag:
  for filepath in Path(WATCH_DIR).glob(*):
    sensor_task >> generate_move_task(filepath) >> generate_dag_trigger(filepath)

Move任务将移动导致任务生成文件,因此下一次DAG运行将不会FileSensorMove任务重新触发TriggerDAG文件实际上,调度程序根本不会为该文件生成任务,因为在所有文件都通过Move之后,输入目录中不再有要迭代的内容。 >

这引起了两个问题:

  1. 执行后,任务日志和渲染不再可用。“图形视图”仅显示DAG的当前状态(空),而不显示运行时的状态。 (树状视图显示任务的运行和状态,但是单击“正方形”并选择任何详细信息会导致气流错误。)
  2. 由于争用情况,下游任务可能会陷入困境。一个任务是将原始文件移动到暂存区。如果这花费的时间比调度程序的轮询时间更长,则调度程序将不再收集下游TriggerDAG(File1)任务,这意味着即使上游任务成功运行,也未计划执行该任务。好像下游任务永远不存在。

通过将任务序列更改为copy(File1) -> TriggerDAG(File1) -> Remove(File1)可以解决竞态条件问题,但更广泛的问题仍然存在:是否可以持久保存动态生成的任务,或者至少可以通过以下方式持续访问它们:气流接口?

解决方法

虽然不清楚,但我假设您不是通过协调器DAG触发的下游DAG不是为每个文件动态生成的(例如Move&TriggerDAG任务);换句话说,不同于您的“移动”任务不断出现和消失(基于文件),下游DAG是静态的,并且始终停留在其中


您已经建立了一个相对复杂的工作流,该工作流可以执行高级任务,例如动态生成任务和触发外部DAG。我认为,只要对DAG的结构稍加修改,就可以摆脱麻烦(这也是IMO的高级功能)

  1. Move任务从上游 orchestrator DAG移到下游(按文件) process DAG
  2. 让上游 orchestrator DAG做两件事
  3. 感应/等待文件出现
  4. 对于每个文件,触发下游的处理 DAG(实际上您已经在执行)。

对于Orchestrator DAG,您可以选择以下任一方式

  1. 只有一个任务,即文件感知+触发每个文件的下游DAG
  2. 有两个任务(我希望这样做)
    • 第一个任务感知文件,并在文件出现时将其列表发布到XCOM
    • 第二个任务读取该XCOM和foreach文件,并触发其对应的DAG

但是无论选择哪种方式,都必须从中复制相关的代码段

这是描述两种任务方法的图

enter image description here

,

对于标题问题的简短回答是,从Airflow 1.10.11开始,不,这似乎无法实现。为了呈现DAG /任务详细信息,Airflow网络服务器始终会按照当前定义并收集到DagBag的DAG和任务进行查询。如果定义更改或消失,那么运气不好。仪表板仅在表中显示日志条目。它不会探查日志中是否存在先前的逻辑(似乎也没有存储除标题以外的大部分逻辑)。

y2k-shubham provides an excellent solution to the unspoken question of "how can I write DAGs/tasks so that the transient metadata are accessible"。他的解决方案的潜台词是:在每次任务运行时将瞬态元数据转换为Airflow存储的内容,但保持任务本身固定不变。 XCom是他在这里使用的解决方案,它确实显示在任务实例的详细信息/日志中。

Airflow能否实现对短暂的一次性任务的持久接口访问,这些一次性任务的定义从DagBag中消失了?有可能但不太可能,原因有两个:

  1. 在呈现仪表板时,这将需要Web服务器探查历史日志,而不仅仅是当前的DagBag,这将需要额外的基础架构来保持Web界面的敏捷性,并使显示非常混乱。
  2. li>
  3. As y2k-shubham notes in a comment to another question of mine出逃和更改任务/ DAG是Airflow的反模式。我想这会使下一个功能很难卖出去。