问题描述
但是这不能正常工作,我应该承认这是我第一次使用python。任何帮助将非常有用。我整理了一个测试DAG来执行以下操作,但是它不起作用:
- 运行任务t1并返回值
- 如果1的值为ALL_SUCCESS,则运行任务t2
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
def set_trigger(taskid,**kwargs):
xcomValue = {{ task_instance.xcom_pull(task_ids=taskid) }}
print( xcomValue," <------- LOOK HERE XCOM VAR")
if(xcomValue == "0"):
return TriggerRule.ALL_SUCCESS
return TriggerRule.ALL_FAILED
dag = DAG(dag_id="example_bash_operator",schedule_interval=None,start_date=datetime(2018,12,31) ) as dag:
t1 = BashOperator(
task_id="t1",bash_command='do something && echo 0 ',dag=dag
)
t2 = BashOperator(
task_id="t2",bash_command='do something else here ',trigger_rule=set_trigger,dag=dag,)
t1 >> t2
```
解决方法
为什么不使用BranchPythonOperator
(docs):
这样,您仅在t1返回的值为0
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
def set_trigger(taskid,**kwargs):
xcomValue = {{ task_instance.xcom_pull(task_ids=taskid) }}
print( xcomValue," <------- LOOK HERE XCOM VAR")
if(xcomValue == "0"):
return TriggerRule.ALL_SUCCESS
return TriggerRule.ALL_FAILED
dag = DAG(dag_id="example_bash_operator",schedule_interval=None,start_date=datetime(2018,12,31) ) as dag:
t1 = BashOperator(
task_id="t1",bash_command='do something && echo 0 ',dag=dag
)
def branch_func(**kwargs):
ti = kwargs['ti']
xcom_value = int(ti.xcom_pull(task_ids='t1'))
if xcom_value == '0':
return 't2'
check_t1 = BranchPythonOperator(
task_id='check_t1',provide_context=True,python_callable=branch_func,dag=dag)
t2 = BashOperator(
task_id="t2",bash_command='do something else here ',trigger_rule=set_trigger,dag=dag,)
t1 >> t2