airflow - 在 Airflow 中创建子标签时访问父 dag 上下文?

标签 airflow apache-airflow-xcom

我试图在 subdag 创建时访问来自父 dag 的一些 xcom 数据,我正在寻找在互联网上实现这一目标,但我没有找到任何东西。

def test(task_id):
    logging.info(f' execution of task {task_id}')


def load_subdag(parent_dag_id, child_dag_id, args):
    dag_subdag = DAG(
        dag_id='{0}.{1}'.format(parent_dag_id, child_dag_id),
        default_args=args,
        schedule_interval="@daily",
    )
    with dag_subdag:
        r = DummyOperator(task_id='random')

        for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
            t = PythonOperator(
                task_id='load_subdag_{0}'.format(i),
                default_args=args,
                python_callable=print_context,
                op_kwargs={'task_id': 'load_subdag_{0}'.format(i)},
                dag=dag_subdag,
            )

    return dag_subdag

load_tasks = SubDagOperator(
        task_id='load_tasks',
        subdag=load_subdag(dag.dag_id,
                           'load_tasks', args),
        default_args=args,
    )

我的代码出现此错误
1  | Traceback (most recent call last):
airflow_1  |   File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 374, in process_file
airflow_1  |     m = imp.load_source(mod_name, filepath)
airflow_1  |   File "/usr/local/lib/python3.6/imp.py", line 172, in load_source
airflow_1  |     module = _load(spec)
airflow_1  |   File "<frozen importlib._bootstrap>", line 684, in _load
airflow_1  |   File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
airflow_1  |   File "<frozen importlib._bootstrap_external>", line 678, in exec_module
airflow_1  |   File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
airflow_1  |   File "/app/dags/airflow_dag_test.py", line 75, in <module>
airflow_1  |     'load_tasks', args),
airflow_1  |   File "/app/dags/airflow_dag_test.py", line 55, in load_subdag
airflow_1  |     for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
airflow_1  | TypeError: xcom_pull() missing 1 required positional argument: 'context'

最佳答案

错误很简单:您缺少 context 方法所需的 xcom_pull() 参数。但是你真的不能只创建 context 来传递给这个方法;它是一个 Python 字典,Airflow 传递给 anchor 方法,如 pre_execute() execute() and BaseOperator (所有 Operator 的父类)。

换句话说, context 仅在 Operator 实际执行时才可用 ,而不是在 DAG -definition 期间。这是有道理的,因为在 Airflow 的分类中, xcom s 是 task s 之间的实时通信机制:它们在运行时相互交谈。

但最终 Xcom s,就像所有其他 Airflow 模型一样,被持久化在后端元数据库中。所以当然你可以直接从那里检索它(显然只有过去运行过的 task 的XCOM)。虽然我没有代码片段,但您可以查看 cli.py ,他们使用 SQLAlchemy ORM 来处理模型和后端数据库。请理解,这意味着每次解析 DAG -definition 文件时都会向您的后端数据库触发查询,这发生得相当快。

有用的链接

  • How can one set a variable for use only during a certain dag_run
  • How to pull xcom value from other task instance in the same DAG run (not the most recent one)?


  • 编辑-1

    看了你的代码片段后,我感到震惊。假设 xcom_pull() 返回的值会经常变化,那么 taskdag 的数量也会不断变化。这可能会导致 不可预测的行为 (你应该做一些研究,但我对此感觉不太好)

    我建议你重新审视你的整个任务工作流程并浓缩到一个设计中
    - task 的数量和
    - DAG 的结构
    提前知道(在执行 dag 定义文件时)。您当然可以迭代 json 文件/SQL 查询的结果(如前面提到的 SQLAlchemy 事情)等,以生成您的实际 task s,但该文件/db/任何不应该经常更改。

    请理解,仅仅迭代一个列表来生成 task 是没有问题的;不可能的是让 DAG 的结构依赖于 upstream task 的结果。例如,您不能根据上游任务在运行时计算 n 的值,在您的 task 中创建 n DAG

    所以这是不可能的
  • Airflow dynamic tasks at runtime
  • Is there a way to create dynamic workflows in Airflow
  • Dynamically create list of tasks

  • 但这是可能的(包括您想要实现的目标;即使您这样做的方式似乎不是一个好主意)
  • Dynamically Generating DAGs in Airflow
  • Airflow DAG dynamic structure
  • etsy/boundary-layer
  • ajbosco/dag-factory


  • 编辑-2

    事实证明,从上游任务的输出生成任务毕竟是可能的;尽管它需要大量有关 Airflow 内部工作原理的知识以及一点创造力。
  • 事实上,除非你真的理解它,否则我强烈建议远离它。
  • 但对于那些无所不知的人来说,这是诀窍 Proper way to create Dynamic Workflows in Airflow
  • 关于airflow - 在 Airflow 中创建子标签时访问父 dag 上下文?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54745555/

    相关文章:

    Airflow:如何扩展SubDagOperator?

    airflow - 在 BigQueryOperator 中将参数添加为 template_fields

    airflow - 如何获得手动触发 DAG 的 Airflow 用户?

    kubernetes - Airflow k8s 运营商 xcom - 握手状态 403 禁止

    python - 从一个 Airflow DAG 返回值到另一个

    ubuntu - Airflow upstart 脚本在启动/运行状态后立即进入停止/等待状态

    Airflow 平行度

    python - 将 Airflow 用于频繁的任务

    airflow - key 错误 : 'ti' in Apache Airflow xcom