Airflow DAG动态结构

标签 airflow airflow-scheduler

我正在寻找一种解决方案,可以在触发 dag 时决定 dag 结构,因为我不确定必须运行的运算符数量。

请参阅下面我计划创建的执行顺序。

           |-- Task B.1 --|                  |-- Task C.1 --|
           |-- Task B.2 --|                  |-- Task C.2 --|
  Task A --|-- Task B.3 --|---> Task  B ---> |-- Task C.3 --|
           |     ....     |                  |     ....     |
           |-- Task B.N --|                  |-- Task C.N --|

我不确定 N 的值。

这在 Airflow 中可能吗?如果是这样,我该如何实现这一目标。

提前致谢

最佳答案

我过去必须做类似的事情,我编写了一个 DAG,它从 YAML 文件中读取,该文件定义了要创建的任务。

我的情况是,我从中提取数据的表的数量可能每周都会发生变化,而不是每次需要添加新表时都将 DAG 重新部署到生产中,而是将 DAG 指向一个 YAML 文件,该文件描述了要提取哪些表。每次出现新表时,我只需使用新表详细信息编辑 YAML 文件即可。

我认为,如果需要先运行上游任务,然后确定要运行多少下游任务,就像下面的问题一样,但类似的问题:

Generating dynamic tasks in airflow based on output of an upstream task

关于Airflow DAG动态结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49114565/

相关文章:

python - 错误: Command "python setup.py egg_info" failed

azure - 我在空闲时收到大量事务(Airflow 和 Azure 文件共享)

python - Airflow 在同一个 dag 的不同时间运行任务?

python - 有什么方法可以找到 DAG 中任意随机任务所花费的时间吗?

kubernetes - 使用 Apache Airflow 在 Kubernetes 中创建部署

airflow - 安排 Airflow dag 30 秒

Airflow 加密变量

Airflow 1.9.0 - 任务执行之间的长时间延迟

google-cloud-platform - 您可以在 Google Cloud Composer 中访问 Airflow CLI 吗?

sqlalchemy - 从 Airflow 连接 postgres 数据库时出错