python - 无法通过 Airflow DAG 连接到 Docker Postgres 服务器

标签 python docker docker-compose airflow

我正在尝试运行查询 Airflow Postgres 数据库中的 dag 表的 Airflow DAG。这是 DAG 的代码:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'depend_on_past': False,
    'start_date': datetime(year=2019, month=10, day=1),
    'retries': 0
}

def get_dag_table():
    query = 'SELECT * FROM dag LIMIT 5;'
    hook = PostgresHook(postgre_conn_id='postgres_default',
                        host='localhost',
                        database='airflow',
                        user='airflow',
                        password='airflow',
                        port=5432)
    connection = hook.get_conn()
    # COMMENTED OUT FOR DEBUGGING
    # cursor = connection.cursor()
    # cursor.execute(request)
    # return cursor.fetchall()

dag = DAG(
    "custom_postgres_tutorial",
    default_args=default_args,
    schedule_interval=None
)

start_task = DummyOperator(task_id='start_task', dag=dag)
postgres_task = PythonOperator(task_id='query_dag_table',
                               python_callable=get_dag_table,
                               dag=dag)
start_task >> postgres_task

以下是我遵循的步骤:

1) 我克隆了 Puckel docker-airflow 存储库 ( https://github.com/puckel/docker-airflow )。

2) 然后我运行命令 $ docker-compose -f docker-compose-LocalExecutor.yml up -d 来启动 Airflow 网络服务器和 Postgres 数据库。

3) 创建了如下所示的自定义连接:

enter image description here

4) 当我触发 DAG 时,出现以下错误:

[2019-10-07 14:51:11,034] {{taskinstance.py:1078}} INFO - Marking task as FAILED.
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table Traceback (most recent call last):
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/bin/airflow", line 32, in <module>
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     args.func(args)
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     return f(*args, **kwargs)
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 522, in run
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     _run(args, dag, ti)
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 440, in _run
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     pool=args.pool,
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     return func(*args, **kwargs)
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 922, in _run_raw_task
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     result = task_copy.execute(context=context)
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     return_value = self.execute_callable()
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     return self.python_callable(*self.op_args, **self.op_kwargs)
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/airflow/dags/tutorial-postgres.py", line 23, in get_dag_table
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     connection = hook.get_conn()
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/lib/python3.7/site-packages/airflow/hooks/postgres_hook.py", line 75, in get_conn
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     self.conn = psycopg2.connect(**conn_args)
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/lib/python3.7/site-packages/psycopg2/__init__.py", line 130, in connect
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table psycopg2.OperationalError: could not connect to server: Connection refused
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table  Is the server running on host "localhost" (127.0.0.1) and accepting
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table  TCP/IP connections on port 5432?
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table could not connect to server: Cannot assign requested address
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table  Is the server running on host "localhost" (::1) and accepting
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table  TCP/IP connections on port 5432?
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table 

我已尝试遵循我在网上找到的所有建议,但没有一个解决了这种情况。我很困惑,因为我可以使用pycharm连接到数据库:

enter image description here

同样,当我运行命令 $ docker container ls 时,我得到以下结果,表明 Postgres 容器在端口 5432 上打开:

CONTAINER ID        IMAGE                          COMMAND                  CREATED             STATUS                 PORTS                                        NAMES
xxxxxxxxxxxx        puckel/docker-airflow:1.10.4   "/entrypoint.sh webs…"   2 hours ago         Up 2 hours (healthy)   5555/tcp, 8793/tcp, 0.0.0.0:8080->8080/tcp   docker-airflow_webserver_1
xxxxxxxxxxxx        postgres:9.6                   "docker-entrypoint.s…"   2 days ago          Up 2 hours             0.0.0.0:5432->5432/tcp                       docker-airflow_postgres_1

最佳答案

尝试将连接 UI 页面中的 Host 字段更改为 host.docker.internalpostgres 而不是 localhost.

关于python - 无法通过 Airflow DAG 连接到 Docker Postgres 服务器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58272934/

相关文章:

python - Windows Python (<=3.10.2) 无法运行 `python -m venv .venv`

python - pip install 在 docker build 上失败

docker - 更新注册表中的 docker 镜像

docker-compose:网络和链接之间的区别

node.js - 文件更改时,keystonejs docker-compose重新启动服务器

python - 使用 BeautifulSoup 抓取表格

python - 任何人都可以帮我检查这个 python 代码吗?

python - docker 容器将监听队列大小增加到 128 以上

YAML 中的 docker-compose.yml 破折号语法

node.js - 如何将 sqlite3 与 docker compose 一起使用