python - 无法为 apache Airflow 安装附加要求

标签 python docker docker-compose airflow

我正在使用以下 docker-compose 图像,我从以下位置获得此图像:https://github.com/apache/airflow/blob/main/docs/apache-airflow/start/docker-compose.yaml

version: "3"
x-airflow-common: &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.0-python3.7}
  environment: &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ""
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
    AIRFLOW__CORE__LOAD_EXAMPLES: "false"
    AIRFLOW__API__AUTH_BACKEND: "airflow.api.auth.backend.basic_auth"
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-apache-spark}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
  depends_on: &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    ports:
      - 6379:6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test:
        [
          "CMD-SHELL",
          'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"',
        ]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    command: version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: "true"
      _AIRFLOW_WWW_USER_CREATE: "true"
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}

  flower:
    <<: *airflow-common
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  ######################################################
  # SPARK SERVICES
  ######################################################

  jupyterlab:
    image: andreper/jupyterlab:3.0.0-spark-3.0.0
    container_name: jupyterlab
    ports:
      - 8888:8888
      - 4040:4040
    volumes:
      - shared-workspace:/opt/workspace
  spark-master:
    image: andreper/spark-master:3.0.0
    container_name: spark-master
    ports:
      - 8081:8080
      - 7077:7077
    volumes:
      - shared-workspace:/opt/workspace
  spark-worker-1:
    image: andreper/spark-worker:3.0.0
    container_name: spark-worker-1
    environment:
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=512m
    ports:
      - 8082:8081
    volumes:
      - shared-workspace:/opt/workspace
    depends_on:
      - spark-master
  spark-worker-2:
    image: andreper/spark-worker:3.0.0
    container_name: spark-worker-2
    environment:
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=512m
    ports:
      - 8083:8081
    volumes:
      - shared-workspace:/opt/workspace
    depends_on:
      - spark-master

volumes:
  postgres-db-volume:
  shared-workspace:
    name: "jordi_airflow"
    driver: local
    driver_opts:
      type: "none"
      o: "bind"
      device: "/Users/jordicrespoguzman/Projects/custom_airflow_spark/spark_folder"
我正在尝试运行以下 DAG:
from airflow import DAG
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.filesystem import FileSensor
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.email import EmailOperator

from datetime import datetime, timedelta
import csv
import requests
import json

default_args = {
    "owner": "airflow",
    "email_on_failure": False,
    "email_on_retry": False,
    "email": "admin@localhost.com",
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}


def printar():
    print("success!")


with DAG(
    "forex_data_pipeline",
    start_date=datetime(2021, 1, 1),
    schedule_interval="@daily",
    default_args=default_args,
    catchup=False,
) as dag:

    downloading_rates = PythonOperator(task_id="test1", python_callable=printar)

    forex_processing = SparkSubmitOperator(
        task_id="spark1",
        application="/opt/airflow/dags/test.py",
        conn_id="spark_conn",
        verbose=False,
    )

    downloading_rates  >> forex_processing
但是我在 Airflow ui 中看到了这个错误:
Broken DAG: [/opt/airflow/dags/dag_spark.py] Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/opt/airflow/dags/dag_spark.py", line 7, in <module>
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
ModuleNotFoundError: No module named 'airflow.providers.apache'
我已指定在 docker-compose 文件中安装其他要求:
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-apache-spark}
我写错了吗?我应该如何指定我想在 Airflow 中安装的附加要求?我可以通过一个requirements.txt吗?如果是这样,我如何指定路径?

最佳答案

支持 _PIP_ADDITIONAL_REQUIREMENTS环境变量尚未发布。它仅受 docker 镜像的开发者/未发布版本支持。计划在 Airflow 2.1.1 中提供此功能。有关更多信息,请参阅:Adding extra requirements for build and runtime of the PROD image.
对于旧版本,您应该构建一个新图像并将该图像设置在 docker-compose.yaml 中。 .为此,您需要执行几个步骤。

  • 新建 Dockerfile具有以下内容:
    FROM apache/airflow:2.0.0
    RUN pip install --no-cache-dir apache-airflow-providers
    
  • 构建一个新镜像:
    docker build . --tag my-company-airflow:2.0.0
    
  • docker-compose.yaml 中设置此图像文件:
    echo "AIRFLOW_IMAGE_NAME=my-company-airflow:2.0.0" >> .env
    

  • 有关更多信息,请参阅:
    Official guide about running Airflow in docker-compose environment
    我特别推荐这个片段,它描述了安装新 pip 包时需要做什么。

    ModuleNotFoundError: No module named 'XYZ'

    The Docker Compose file uses the latest Airflow image (apache/airflow). If you need to install a new Python library or system library, you can customize and extend it.


    我建议您查看有关 building Docker Image 的指南.这解释了如何安装更复杂的依赖项。
    我还建议只使用官方网站上的 Docker-compose 文件,并针对特定版本。较新版本的 Docker-compose 文件可能不适用于较旧版本的 Airflow,因为我们一直在对这些文件进行许多改进,以提高稳定性可靠性和用户体验。

    关于python - 无法为 apache Airflow 安装附加要求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67851351/

    相关文章:

    python - 将 PyPy 编译为 Exe

    java - 如何使用 Dockerfile 安装特定版本的 Java 8

    docker - 无法构建克隆的 Jekyll 站点 - jekyll 3.8.5 |错误 : Permission denied @ dir_s_mkdir -/srv/jekyll/_site

    ruby-on-rails - 如何在 Docker 中运行 Rails? PG::ConnectionBad 无法将主机名 "pg"转换为地址:没有与主机名关联的地址

    python - Excel 电子表格的 Pandas groupby

    python - 如何在 Python 中创建 Mixin 工厂?

    java - 免费(最好是开源)语音到文本库的选项

    apache-spark - 使用docker登录到Zeppelin问题

    docker - docker stop和docker container stop有什么区别?

    mongodb - 当我运行 docker compose 时,我的 golang(摄取)容器无法显示 "Error establishing Mongo session"