XCOM和BashOperators的气流依赖性

问题描述

但是这不能正常工作,我应该承认这是我第一次使用python。任何帮助将非常有用。我整理了一个测试DAG来执行以下操作,但是它不起作用:

  • 运行任务t1并返回值
  • 如果1的值为ALL_SUCCESS,则运行任务t2
    from datetime import datetime
    from airflow.models import DAG
    from airflow.operators.bash_operator import BashOperator

    def set_trigger(taskid,**kwargs):
        xcomValue = {{ task_instance.xcom_pull(task_ids=taskid) }}
        print( xcomValue," <------- LOOK HERE XCOM VAR")
       if(xcomValue == "0"):
          return TriggerRule.ALL_SUCCESS
       return TriggerRule.ALL_FAILED 

    dag = DAG(dag_id="example_bash_operator",schedule_interval=None,start_date=datetime(2018,12,31) ) as dag:

    t1 = BashOperator(
        task_id="t1",bash_command='do something && echo 0 ',dag=dag
    )

    t2 = BashOperator(
          task_id="t2",bash_command='do something else here ',trigger_rule=set_trigger,dag=dag,)

    t1 >> t2
    ```

解决方法

为什么不使用BranchPythonOperatordocs):

这样,您仅在t1返回的值为0

时运行t2
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator

def set_trigger(taskid,**kwargs):
    xcomValue = {{ task_instance.xcom_pull(task_ids=taskid) }}
    print( xcomValue," <------- LOOK HERE XCOM VAR")
   if(xcomValue == "0"):
      return TriggerRule.ALL_SUCCESS
   return TriggerRule.ALL_FAILED 

dag = DAG(dag_id="example_bash_operator",schedule_interval=None,start_date=datetime(2018,12,31) ) as dag:

t1 = BashOperator(
    task_id="t1",bash_command='do something && echo 0 ',dag=dag
)

def branch_func(**kwargs):
    ti = kwargs['ti']
    xcom_value = int(ti.xcom_pull(task_ids='t1'))
    if xcom_value == '0':
        return 't2'

check_t1 = BranchPythonOperator(
    task_id='check_t1',provide_context=True,python_callable=branch_func,dag=dag)

t2 = BashOperator(
      task_id="t2",bash_command='do something else here ',trigger_rule=set_trigger,dag=dag,)

t1 >> t2

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...