如何从on_failure_callback中提取XCOM值

问题描述

任务失败时,是否有可能在期间提取先前在其他任务中设置的 XCOM 值> on_failure_callback 执行?

更具体地说,例如:

dag: task1 >> task2
  • 任务1 成功运行并在 Xcom
  • 中设置key="test" value=123
  • 任务2 失败
  • on_failure_callback 被呼叫

是否可以在 on_failure_callback 中检索键test的值?

我尝试过这样,但是似乎没有任何价值:

# Daf configuration
    ...
    "on_failure_callback": deploy_failure,...

# In task1
    kwargs["ti"].xcom_push(key="test",value=123)

# on_failure_callback method
def deploy_failure(context):
    print("/!\ Deploy failure callback triggered...")
    test_value = context.get("ti").xcom_pull(key="test")
    print(test_value)

test_value None

我确定Xcom值已设置,因为我可以在 Airflow 后端看到它。

有什么主意吗?

解决方法

我想fail_callback中的provide_context有问题。您可以通过直接访问XCom类来解决此问题:

from airflow.models import XCom

def deploy_failure(context):
    print("/!\ Deploy failure callback triggered...")
    test_value = XCom.get_one(execution_date = context.get('execution_date'),key='test')
    print("ALERT: {0}".format(test_value))

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...