问题描述
任务失败时,是否有可能在期间提取先前在其他任务中设置的 XCOM 值> on_failure_callback 执行?
更具体地说,例如:
dag: task1 >> task2
- 任务1 成功运行并在 Xcom 中设置
- 任务2 失败
- on_failure_callback 被呼叫
key="test"
value=123
是否可以在 on_failure_callback 中检索键test
的值?
我尝试过这样,但是似乎没有任何价值:
# Daf configuration
...
"on_failure_callback": deploy_failure,...
# In task1
kwargs["ti"].xcom_push(key="test",value=123)
# on_failure_callback method
def deploy_failure(context):
print("/!\ Deploy failure callback triggered...")
test_value = context.get("ti").xcom_pull(key="test")
print(test_value)
test_value 为None
我确定Xcom值已设置,因为我可以在 Airflow 后端看到它。
有什么主意吗?
解决方法
我想fail_callback中的provide_context
有问题。您可以通过直接访问XCom类来解决此问题:
from airflow.models import XCom
def deploy_failure(context):
print("/!\ Deploy failure callback triggered...")
test_value = XCom.get_one(execution_date = context.get('execution_date'),key='test')
print("ALERT: {0}".format(test_value))