使用TaskInstance通过XCOMM从PythonOperator获取返回的值

我正在尝试编写一些气流集成测试,其中,我通过从自定义PythonOperator返回文件列表来模仿gcs_list_operator,然后将其通过xcomm传递给PythonBranchOperator。

在本地运行代码时,我在不使用键的情况下,很难从BranchOperator中的PythonOperator中提取上一个任务的返回值。

kwargs['ti'].xcom_push(key="dummyKey",value=value)

kwargs['ti'].xcom_pull(key='dummyKey',task_ids='push_to_xcoms')

下面的代码说明了我要执行的操作,即返回列表并在不声明键的情况下将其拉出。在气流计划程序中,它可以正常工作并打印列表。但是,在我的IDE中运行时,它不是拉/打印。有人可以给我建议吗?

from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from airflow.models.taskinstance import TaskInstance

DAG = DAG(
  dag_id='example_dag',start_date=datetime(2018,1,1),schedule_interval='@once'
)

def push_function(**kwargs):
    ls = ['a','b','c']
    return ls

push_task = PythonOperator(
    task_id='push_task',python_callable=push_function,provide_context=True,dag=DAG)

def pull_function(**kwargs):
    ti = kwargs['ti']
    ls = ti.xcom_pull(task_ids='push_task')
    print(ls)

pull_task = PythonOperator(
    task_id='pull_task',python_callable=pull_function,dag=DAG)

push_task >> pull_task

# push context
push_to_xcoms_ti = TaskInstance(task=push_task,execution_date=datetime.now())
context = push_to_xcoms_ti.get_template_context()
push_task.execute(context)

# branch operator pull from context
check_content_ti = TaskInstance(task=pull_task,execution_date=datetime.now())
context = check_content_ti.get_template_context()
pull_task.execute(context)``

相关文章

功能概要:(目前已实现功能)公共展示部分:1.网站首页展示...
大体上把Python中的数据类型分为如下几类: Number(数字) ...
开发之前第一步,就是构造整个的项目结构。这就好比作一幅画...
源码编译方式安装Apache首先下载Apache源码压缩包,地址为ht...
前面说完了此项目的创建及数据模型设计的过程。如果未看过,...
python中常用的写爬虫的库有urllib2、requests,对于大多数比...