XCOM和BashOperators的气流依赖性

问题描述

但是这不能正常工作,我应该承认这是我第一次使用python。任何帮助将非常有用。我整理了一个测试DAG来执行以下操作,但是它不起作用:

  • 运行任务t1并返回值
  • 如果1的值为ALL_SUCCESS,则运行任务t2
    from datetime import datetime
    from airflow.models import DAG
    from airflow.operators.bash_operator import BashOperator

    def set_trigger(taskid,**kwargs):
        xcomValue = {{ task_instance.xcom_pull(task_ids=taskid) }}
        print( xcomValue," <------- LOOK HERE XCOM VAR")
       if(xcomValue == "0"):
          return TriggerRule.ALL_SUCCESS
       return TriggerRule.ALL_FAILED 

    dag = DAG(dag_id="example_bash_operator",schedule_interval=None,start_date=datetime(2018,12,31) ) as dag:

    t1 = BashOperator(
        task_id="t1",bash_command='do something && echo 0 ',dag=dag
    )

    t2 = BashOperator(
          task_id="t2",bash_command='do something else here ',trigger_rule=set_trigger,dag=dag,)

    t1 >> t2
    ```

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)