不管上游任务状态如何执行任务

问题描述

我有一个相对简单的DAG,定义如下:

remove_conf_file

在这里的目标是确保始终执行trigger_rule=TriggerRule.ALL_DONE,而不管先前所有任务的状态如何。

我尝试在Pythonoperator调用中使用remove_conf_file,但是只有在完成所有先前的任务后才会执行check_running_stat_job
如果任务remove_conf_file失败,则将不会执行<a>任务。

无论上游任务的状态为“完成”,“失败”还是“未完成”,我都希望删除文件

我已经尝试了几种DAG配置,但似乎都没有用。

[EDIT]
这是Airflow中的DAG树视图和DAG视图:

DAG standard view

DAG tree view

解决方法

我复制了图像中显示的任务结构,并在任务remove_conf_file失败后成功运行了任务spark_etl

enter image description here

键是将trigger_rule='all_done'添加到remove_conf_file任务中。这并不意味着所有上游任务都需要成功执行,而只是需要完成(无论成功或失败)。我用的是BashOperator而不是您在问题中提到的PythonOperator。

git repository包含带有DAG的相应dockerfile。


编辑:

作为参考,如果我们要测试spark_etl任务的失败和随后remove_conf_file任务的成功执行,以下是成功的代码:

t4 = BashOperator(
    task_id='spark_etl',bash_command='exit 123"',# simulation of task failure
    dag=dag)
    
t6 = BashOperator(
    task_id='remove_conf_file',bash_command='echo "Task 6"',dag=dag,trigger_rule='all_done')

(完整代码可在上述我的git存储库中找到。)