airflow - 从 Airflow 中已清除的任务中获取 XComs

标签 airflow

我有任务失败时的松弛警报,但我也希望有恢复消息。

当任务最初失败时,它会在其 on_failure_callback 中执行 xcom_push。我在此处保存的内容可在下一次 DAG 运行中使用:

context['ti'].xcom_pull(key='my_task_state',
                        task_ids=context['task'].task_id,
                        include_prior_dates=True)

但是,如果我清除失败的任务以便它重新运行,在其 on_failure_callback/on_success_callback 中,我会尝试此操作以获取我在初始尝试中保存的值:

context['ti'].xcom_pull(key='my_task_state',
                        task_ids=context['task'].task_id,
                        include_prior_dates=False)

这将返回 None。如果我设置 include_prior_dates=True,它将返回上一个 DAG 运行的值,但不会返回任务已清除的当前值。

我是不是做错了什么,或者是否有一种变通方法可以用来获得我正在寻找的 XCom 值?

最佳答案

Yong Wang's answer很好地解释了为什么我无法获得我想要的值。不过,我想出了一个解决方法。

xcom_pushxcom_pull 都只是调用 XCom 上的类方法。你可以直接调用这些。事实证明,您可以使用一个虚构的任务 ID,它会将其保存到该 ID 下的 xcom 表中。由于它不是真正的任务,因此当任务(或 DAG)被清除时它不会被删除。

from airflow.models import XCom

def set_xcom(context, value):
    XCom.set(key='my_key',
             value=value
             task_id='{}_SOME_SUFFIX'.format(context['ti'].task_id),
             dag_id=context['ti'].dag_id,
             execution_date=context['ti'].execution_date)

def get_xcom(context):
    return XCom.get_one(context['ti'].execution_date,
                        key='my_key',
                        task_id='{}_SOME_SUFFIX'.format(context['ti'].task_id),
                        dag_id=context['ti'].dag_id,
                        include_prior_dates=False)

这不是使用 XCom 的标准方式,所以我以后升级到新版本的 Airflow 时必须小心。

关于airflow - 从 Airflow 中已清除的任务中获取 XComs,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57470630/

相关文章:

airflow - 示例DAG无限期陷入 “running”状态

python - AirFlow 调度程序 - 运行日期

airflow "python operator"将文件写入不同位置

python - 设置 Airflow 计划间隔

java - 约 10 秒后与 Airflow docker 容器断开连接

docker - 如何使用 gpg key 解密 docker 容器中的文件而不将其保存在镜像中?

postgresql - Airflow 将 postgres 数据库的所有表导出到 BigQuery

google-cloud-platform - 更改 Cloud Composer 的 DAG 的默认 GCS 位置

python - 想要创建当前任务下游的 Airflow 任务

具有不同 conda 环境的 Python 任务和 DAG