python - 如何在不耗尽连接的情况下在 Airflow 中将 DatabaseHook 对象与 PythonOperator 一起使用?

标签 python database-connection airflow

我正在尝试使用 Airflow Connections 存储我的数据库凭据,并将它们与 PythonOperators 一起使用。我注意到,如果我将凭据传递给 PythonOperator,那么每个变量都会被记录下来,包括数据库密码。因此,根据下面的示例,我开始将连接对象本身传递给 PythonOperator。

但我现在遇到的问题是 Airflow 会产生大量这样的对象,即使这个 dag 只计划每天运行,导致经常出现达到连接限制的问题。 如何在不为 Airflow 中的数据脚本使用大量连接的情况下将 PostgresHook 与 PythonOperator 结合使用?

import sys
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook

try:
    sys.path.append('/path/to/my/awesome/module/')
    from awesome_module import function_1, function_1
except:
    raise ImportError("Couldn't import awesome_module")

postgres_hook_object = PostgresHook("dedicated_bot_account")


with postgres_hook_object.get_conn() as con:
    t1 = PythonOperator(
            task_id = 'function_1',
            python_callable = function_1, 
            dag = dag,
            op_kwargs = {'conn':con}
            )

    t2 = PythonOperator(
            task_id = 'function_2',
            python_callable = function_2,
            dag = dag,
            op_args = [con, service]
            )

最佳答案

从 Airflow Slack 我了解到 DAG 中的代码以调度程序的频率运行,因此每次调度程序刷新 DAG 时都会打开多个连接。

似乎最佳做法是确保连接仅在任务运行时通过以下任一方式打开:

  1. 如果任务在 DAG 中定义,则将连接打开代码移动到 Python 函数定义中
  2. 如果任务在别处定义,则在任务中打开连接。 请注意,如果通过明文作为变量传递连接信息,则会记录下来

关于python - 如何在不耗尽连接的情况下在 Airflow 中将 DatabaseHook 对象与 PythonOperator 一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60403691/

相关文章:

airflow - API 请求返回 - Airflow 404 = 很多圆圈

python - 使用 dlib 面部标志检测仅将下巴保存为图像,其余部分为透明

python - 对列表中的连续数字求和。 Python

java - 将 Java 类连接到 Derby 数据库可提高池连接的性能

intellij-idea - 如何解决问题 : "The specified database user/password combination is rejected" using Intellij IDEA?

python - AirFlow 调度程序 - 运行日期

airflow - 如何将 xcom 作为 PostgresOperator 参数获取?

Python:如何在随机多项选择中跟踪 "correct"答案

python - 在列表列表中,如何找到与内部列表关联的值的平均值?

java - 超出最大游标数 SQLException-- 配置问题或游标泄漏?