使用TaskInstance通过XCOMM从PythonOperator获取返回的值

我正在尝试编写一些气流集成测试,其中,我通过从自定义PythonOperator返回文件列表来模仿gcs_list_operator,然后将其通过xcomm传递给PythonBranchOperator。

在本地运行代码时,我在不使用键的情况下,很难从BranchOperator中的PythonOperator中提取上一个任务的返回值。

kwargs['ti'].xcom_push(key="dummyKey",value=value)

kwargs['ti'].xcom_pull(key='dummyKey',task_ids='push_to_xcoms')

下面的代码说明了我要执行的操作,即返回列表并在不声明键的情况下将其拉出。在气流计划程序中,它可以正常工作并打印列表。但是,在我的IDE中运行时,它不是拉/打印。有人可以给我建议吗?

from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from airflow.models.taskinstance import TaskInstance

DAG = DAG(
  dag_id='example_dag',start_date=datetime(2018,1,1),schedule_interval='@once'
)

def push_function(**kwargs):
    ls = ['a','b','c']
    return ls

push_task = PythonOperator(
    task_id='push_task',python_callable=push_function,provide_context=True,dag=DAG)

def pull_function(**kwargs):
    ti = kwargs['ti']
    ls = ti.xcom_pull(task_ids='push_task')
    print(ls)

pull_task = PythonOperator(
    task_id='pull_task',python_callable=pull_function,dag=DAG)

push_task >> pull_task

# push context
push_to_xcoms_ti = TaskInstance(task=push_task,execution_date=datetime.now())
context = push_to_xcoms_ti.get_template_context()
push_task.execute(context)

# branch operator pull from context
check_content_ti = TaskInstance(task=pull_task,execution_date=datetime.now())
context = check_content_ti.get_template_context()
pull_task.execute(context)``

相关文章

Python中的函数(二) 在上一篇文章中提到了Python中函数的定...
Python中的字符串 可能大多数人在学习C语言的时候,最先接触...
Python 面向对象编程(一) 虽然Python是解释性语言,但是它...
Python面向对象编程(二) 在前面一篇文章中谈到了类的基本定...
Python中的函数(一) 接触过C语言的朋友对函数这个词肯定非...
在windows下如何快速搭建web.py开发框架 用Python进行web开发...