python - 使用数据库信息构建动态 DAG

标签 python airflow airflow-scheduler

我是 Airflow 的新手,我正在尝试找出使用从数据库检索的信息动态创建一组 DAG 的最佳方法。 目前我已经想到了这个可能的解决方案:

# file: dags_builder_dag.py in DAG_FOLDER

# Get info to build required dags from DB
dag_info = api_getDBInfo()
# Dynamically create dags based on info retrieved
for dag in dag_info:
    dag_id = 'hello_world_child_{}'.format(str(dag['id']))
    default_args_child = {'owner': 'airflow', 'start_date': datetime(2021, 1, 1)}
    # Add dag to global scope to let airflow digest it.
    globals()[dag_id] = create_dag(dag_id, default_args_child)

但是,如果我没记错的话,所有的 dag 文件,包括本例中生成所有 dag 的文件 (dags_builder_dag.py),都会被 Airflow 定期解析,这意味着 api_getDBInfo() 将被执行在每次解析时。如果我是对的,最好的做法是避免连续执行 api_getDBInfo(),这对数据库来说可能是一个耗时的操作?理想情况下,应仅在需要时检索此信息,比方说在手动触发时。

我想到的其他可能的解决方法:

  • 使用 Airlfow Variable作为评估是否该再次解析的标志 dags_builder_dag.py 该变量可以按以下方式使用:
# file: dags_builder_dag.py in DAG_FOLDER

buildDAGs = Variables.get('buildDAGs')
if buildDAGs == 'true':
  # Get info to build required dags from DB
  dag_info = api_getDBInfo()
  # Dynamically create dags based on info retrieved
  for dag in dag_info:
      dag_id = 'hello_world_child_{}'.format(str(dag['id']))
      default_args_child = {'owner': 'airflow', 'start_date': datetime(2021, 1, 1)}
      # Add dag to global scope to let airflow digest it.
      globals()[dag_id] = create_dag(dag_id, default_args_child)
  • 将 airflow.cfg 文件的参数 min_file_process_interval 设置为更高的值,以避免连续解析。但是,这也有增加 dags 运行时延迟的缺点。

更新

感谢@NicoE 和@floating_hammer,我找到了适合我的用例的解决方案。

第一次尝试:Airflow 变量作为缓存

我可以使用 Airflow 变量作为存储在数据库中的数据的缓存,以避免连续调用“api_getDBInfo”。然而,通过这种方式,我遇到了另一个瓶颈:可变大小。 Airflow 变量是键值对。键的长度为 - 256。存储在元数据中的值将受到元数据数据库支持的字符串大小的限制。 https://github.com/apache/airflow/blob/master/airflow/models/variable.py https://github.com/apache/airflow/blob/master/airflow/models/base.py

在我的例子中,我使用的是 Amazon MWAA以及与 aws 使用的底层元数据数据库及其结构相关的详细信息可能很难找到(实际上我并没有尝试进行大量调查)。所以我只是执行了一个压力测试,在变量中强制输入大量数据,看看会发生什么。下面是结果:

<表类="s-表"> <头> 数据量 结果 <正文> ~0,5 MB(当前) 写入和读取操作没有问题。 ~50 MB (x100) 写入和读取操作没有问题。 ~125 MB (x250) 写入和读取操作没有问题,但是使用 airflow 的 Web 控制台无法访问变量部分。服务器返回错误502“Bad gateway” ~250 MB (x500) 写入变量失败

第二次尝试:S3文件作为缓存

Airflow 变量有一个限制,正如之前的测试所示,所以我尝试保持相同的模式,使用 S3 文件更改 Airflow 变量,考虑到 S3 没有,这对我的特定用例很有效' Airflow 变量没有空间限制。

总结一下:

  1. 我创建了一个名为“sync_db_cache_dag”的 dag,它每小时使用 api_getDBInfo() 检索的数据更新 S3“db_cache.json”。数据以 JSON 格式存储。
  2. 脚本“dags_builder_dag.py”现在从“db_cache.json”中检索数据,这样数据库就无需连续调用“api_getDBInfo”。

最佳答案

您可以尝试以下步骤。

  • 创建一个变量,用于保存任务配置和要创建的任务数量。

创建一个以设定频率触发的 DAG。 dag 有两个任务。

  • 任务 1 读取数据库并填充变量。
  • 任务 2 读取变量并创建多个任务。

关于python - 使用数据库信息构建动态 DAG,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66528113/

相关文章:

airflow - 我们应该让 Airflow 调度程序运行多长时间?

Python:如何聚合 DataFrame 中的值

python - PyQt5 不同选项卡上的不同网格

重命名任务后 Airflow dag 卡住

python - Airflow 在单个 DAG 中生成动态任务,任务 N+1 依赖于任务 N

cron - 手动触发 Airflow DAG 会干扰预定的 Airflow 触发吗?

python - 仅当代码不在函数或类中时,使用 smtplib 向 mailtrap 发送电子邮件才有效

python - DataFrame .loc 泄漏内存

pandas - Apache Airflow 或 Apache Beam 用于数据处理和作业调度

jobs - Apache Oozie 和 Apache Airflow 选择哪一个?需要比较