python - 从一个 Airflow DAG 返回值到另一个

标签 python airflow apache-airflow-xcom

我的 DAG(我们称之为 DAG_A)使用 trigger_dagrun 启动另一个 DAG(DAG_B)运算符(operator)。 DAG_B 的任务使用 XCOM,我想在完成后从 DAG_B 运行的任务之一(正是我开始的任务)中获取 XCOM 值。
使用 XCOM 不是硬性要求 - 基本上 Airflow 本身提供的任何(合理的)机制都可以工作。如果需要,我可以更改 DAG_B。
找不到此类案例的任何示例,因此感谢您的帮助。
计划 B 是让 DAG_B 将 XCOM 值与一些运行 ID 一起保存到一些持久性存储中,例如 DB 或文件,DAG_A 将从那里获取它。但如果有一些内置机制可用,我想避免这种复杂化。

最佳答案

您可以通过传入 dag_id 从另一个 dag 中提取 XCOM 值。至 xcom_pull() (请参阅 task_instance.xcom_pull() function documentation )。只要您使用与当前 DAG 相同的执行日期触发 subdag,这就会起作用。这可以通过模板化 execution_date 轻松实现。值(value):

trigger = TriggerDagRunOperator(
    task_id="trigger_dag_b",
    trigger_dag_id="DAG_B",
    execution_date="{{ execution_date }}",
    ...
)
然后,假设您使用了 ExternalTaskSensor传感器等待特定任务完成或使用 wait_for_completion=True在您的 TriggerDagRunOperator()任务,稍后您可以使用 task_instance.xcom_pull(dag_id="DAG_B", ...) 拉取 XCOM (添加任务 id 和/或您要提取的 XCOM key )。
如果您不反对编写 Python 运算符,您还可以导入 XCom模型,只需使用它的 XCom.get_one() method直接地:
value = XCom.get_one(
    execution_date=ti.execution_date,
    key="target key",
    task_id="some.task.id",
    dag_id="DAG_B",
)
我使用了类似的技术,使用多 dagrun 触发器(处理可变数量的资源);这比较棘手,因为在这种情况下您不能重复使用执行日期(每个 dagrun 必须有一个唯一的 (dag_id, execution_date) 元组)。
在这些情况下,我要么使用直接查询(使用触发器存储在 XCom 中的 dagrun id 将 SQLAlchemy XCom 模型与 DagRun 模型结合起来,而不是依赖于执行日期匹配),或者通过配置来避免整个问题前面的 subdags。后者是通过设置带有配置的子 dag 来实现的,该配置告诉它在哪里输出父 dag 然后选择的结果。文档似乎没有正确提及这一点,但 conf论据 TriggerDagRun()也支持模板化,因此您可以在其中生成字典作为子 dag 的输入,子 dag 中的任务然后通过 params 引用配置.

关于python - 从一个 Airflow DAG 返回值到另一个,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67299728/

相关文章:

python - 如何在 Python 中从 SXS 加载 C DLL?

google-bigquery - Airflow BigQueryOperator : how to save query result in a partitioned Table?

python - 无法在 GitHub 上安装 Python 库的最新候选版本

python - 如何在 sla_miss_callback 函数中获取上下文对象

airflow - key 错误 : 'ti' in Apache Airflow xcom

python - Django修改密码问题,super(type, obj) : obj must be an instance or subtype of type

python - OpenCV 版本 4.1.0 drawContours 错误 : (-215:Assertion failed) npoints > 0 in function 'drawContours'

python - Numba 字典 : signature in JIT() decorator

airflow - 在 Airflow 中创建子标签时访问父 dag 上下文?