python - 如何通过单个脚本生成多个 Airflow dags?

标签 python workflow airflow directed-acyclic-graphs

我想使用一个脚本生成多个 Airflow dag。 dag 名称应为“test_parameter”。下面是我的脚本:

from datetime import datetime

# Importing Airflow modules
from airflow.models import DAG
from airflow.operators import DummyOperator

# Specifying the default arguments for the DAG
default_args = {
    'owner': 'Test',
    'start_date': datetime.now()
    }

parameter_list = ["abc", "pqr", "xyz"]

for parameter in parameter_list:
    dag = DAG("test_"+parameter,
              default_args=default_args,
              schedule_interval=None)
    dag.doc_md = "This is a test dag"

    # Creating Start Dummy Operator
    start = DummyOperator(
        task_id="start",
        dag=dag)

    # Creating End Dummy Operator
    end = DummyOperator(
        task_id="end",
        dag=dag)

    # Design workflow of tasks in the dag
    end.set_upstream(start)

所以在这种情况下,它应该创建 3 个 dag:“test_abc”、“test_pqr”和“test_xyz”。

但是在运行脚本时,它只会创建一个 dag“test_xyz”。有关如何解决此问题的任何见解。提前致谢 :)

最佳答案

是的,这是可能的,您可以将每个 DAG 的配置保存在存储中。例如,您可以将配置保存在持久存储 (DB) 中,然后获取配置并将结果保存在缓存中。这样做主要是因为我们想防止每次 DAG 脚本刷新时 dag 脚本从数据库中获取配置。因此,我们使用缓存并保存其过期时间。你可以引用这个article关于如何创建动态 DAG

for i in range(10):
  dag_id = 'foo_{}'.format(i)
  globals()[dag_id] = DAG(dag_id)

反过来,您还希望创建动态子 DAG 和动态任务。希望能帮助到你 :-)

关于python - 如何通过单个脚本生成多个 Airflow dags?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49328051/

相关文章:

python - 为什么从服务器接收到所有数据后客户端套接字连接没有关闭?

Python: "import"更喜欢什么——模块还是包?

linux - 为什么 Linux 内核仓库只有一个分支?

python - 如何删除 Airflow 中的下游或上游任务依赖性

python - 安全的 Python 解释器?

Python:为列表中的所有元素添加相同的前缀

mysql - 放入并行管道时,任务未显示在 DAG 中

scheduler - Airflow 调度程序继续执行没有心跳的失败作业

workflow - 有没有办法在 sublime text 2 中平移?

python - 在 Python 中导入模块 - 最佳实践