我有任务失败时的松弛警报,但我也希望有恢复消息。
当任务最初失败时,它会在其 on_failure_callback
中执行 xcom_push
。我在此处保存的内容可在下一次 DAG 运行中使用:
context['ti'].xcom_pull(key='my_task_state',
task_ids=context['task'].task_id,
include_prior_dates=True)
但是,如果我清除失败的任务以便它重新运行,在其 on_failure_callback
/on_success_callback
中,我会尝试此操作以获取我在初始尝试中保存的值:
context['ti'].xcom_pull(key='my_task_state',
task_ids=context['task'].task_id,
include_prior_dates=False)
这将返回 None
。如果我设置 include_prior_dates=True
,它将返回上一个 DAG 运行的值,但不会返回任务已清除的当前值。
我是不是做错了什么,或者是否有一种变通方法可以用来获得我正在寻找的 XCom 值?
最佳答案
Yong Wang's answer很好地解释了为什么我无法获得我想要的值。不过,我想出了一个解决方法。
xcom_push
和 xcom_pull
都只是调用 XCom 上的类方法。你可以直接调用这些。事实证明,您可以使用一个虚构的任务 ID,它会将其保存到该 ID 下的 xcom
表中。由于它不是真正的任务,因此当任务(或 DAG)被清除时它不会被删除。
from airflow.models import XCom
def set_xcom(context, value):
XCom.set(key='my_key',
value=value
task_id='{}_SOME_SUFFIX'.format(context['ti'].task_id),
dag_id=context['ti'].dag_id,
execution_date=context['ti'].execution_date)
def get_xcom(context):
return XCom.get_one(context['ti'].execution_date,
key='my_key',
task_id='{}_SOME_SUFFIX'.format(context['ti'].task_id),
dag_id=context['ti'].dag_id,
include_prior_dates=False)
这不是使用 XCom 的标准方式,所以我以后升级到新版本的 Airflow 时必须小心。
关于airflow - 从 Airflow 中已清除的任务中获取 XComs,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57470630/