我正在尝试编写一些气流集成测试,其中,我通过从自定义PythonOperator返回文件列表来模仿gcs_list_operator,然后将其通过xcomm传递给PythonBranchOperator。
在本地运行代码时,我在不使用键的情况下,很难从BranchOperator中的PythonOperator中提取上一个任务的返回值。
kwargs['ti'].xcom_push(key="dummyKey",value=value)
kwargs['ti'].xcom_pull(key='dummyKey',task_ids='push_to_xcoms')
下面的代码说明了我要执行的操作,即返回列表并在不声明键的情况下将其拉出。在气流计划程序中,它可以正常工作并打印列表。但是,在我的IDE中运行时,它不是拉/打印。有人可以给我建议吗?
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from airflow.models.taskinstance import TaskInstance
DAG = DAG(
dag_id='example_dag',start_date=datetime(2018,1,1),schedule_interval='@once'
)
def push_function(**kwargs):
ls = ['a','b','c']
return ls
push_task = PythonOperator(
task_id='push_task',python_callable=push_function,provide_context=True,dag=DAG)
def pull_function(**kwargs):
ti = kwargs['ti']
ls = ti.xcom_pull(task_ids='push_task')
print(ls)
pull_task = PythonOperator(
task_id='pull_task',python_callable=pull_function,dag=DAG)
push_task >> pull_task
# push context
push_to_xcoms_ti = TaskInstance(task=push_task,execution_date=datetime.now())
context = push_to_xcoms_ti.get_template_context()
push_task.execute(context)
# branch operator pull from context
check_content_ti = TaskInstance(task=pull_task,execution_date=datetime.now())
context = check_content_ti.get_template_context()
pull_task.execute(context)``