问题描述
remove_conf_file
我在这里的目标是确保始终执行trigger_rule=TriggerRule.ALL_DONE
,而不管先前所有任务的状态如何。
我尝试在Pythonoperator
调用中使用remove_conf_file
,但是只有在完成所有先前的任务后才会执行check_running_stat_job
。
如果任务remove_conf_file
失败,则将不会执行<a>
任务。
无论上游任务的状态为“完成”,“失败”还是“未完成”,我都希望删除文件。
我已经尝试了几种DAG配置,但似乎都没有用。
[EDIT]
这是Airflow中的DAG树视图和DAG视图:
解决方法
我复制了图像中显示的任务结构,并在任务remove_conf_file
失败后成功运行了任务spark_etl
:
键是将trigger_rule='all_done'
添加到remove_conf_file
任务中。这并不意味着所有上游任务都需要成功执行,而只是需要完成(无论成功或失败)。我用的是BashOperator而不是您在问题中提到的PythonOperator。
此git repository包含带有DAG的相应dockerfile。
编辑:
作为参考,如果我们要测试spark_etl
任务的失败和随后remove_conf_file
任务的成功执行,以下是成功的代码:
t4 = BashOperator(
task_id='spark_etl',bash_command='exit 123"',# simulation of task failure
dag=dag)
t6 = BashOperator(
task_id='remove_conf_file',bash_command='echo "Task 6"',dag=dag,trigger_rule='all_done')
(完整代码可在上述我的git存储库中找到。)