python - Airflow 无法导入自定义 python 包

标签 python airflow importerror

我想通过自定义 python 项目中的 Airflow 调用脚本

我的目录结构是:

/home/user/
      ├──airflow/
      │  ├──dags
             ├──.venv_airflow (virtual environment for airflow)
      │      ├──my_dag.py
      ├──my_project
         ├──.venv (virtual environment for my_project)
         ├──folderA
            ├──__init__.py
            ├──folderB
               ├──call_me.py (has a line "from my_project.folderA.folderB import import_me")
               ├──import_me.py

我的 dag 文件如下所示:

from airflow import DAG
import datetime as dt
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'arpita',
    'start_date': dt.datetime(2019, 11, 20),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5),
    'depends_on_past': False,
    'email': ['example@abc.com'],
    'email_on_failure': True,
    'email_on_retry': True,
}

with DAG('sample',
         default_args=default_args,
         schedule_interval='30 * * * *',
         ) as dag:
    enter_project = BashOperator(task_id='enter_project',
                                 bash_command='cd /home/user/my_project',
                                 retries=2)
    setup_environment = BashOperator(task_id='setup_environment',
                                     bash_command='source /home/user/my_project/.venv/bin/activate',
                                     retries=2)
    call_script = BashOperator(task_id='call_script',
                                 bash_command='python -m my_project.folderA.folderB.call_me,
                                 retries=2)

enter_project >> setup_environment >> call_script

但我收到此错误

[2019-11-22 11:56:49,311] {bash_operator.py:115} INFO - Running command: python -m my_project.folderA.folderB.call_me
[2019-11-22 11:56:49,315] {bash_operator.py:124} INFO - Output:
[2019-11-22 11:56:49,349] {bash_operator.py:128} INFO - /home/user/airflow/.venv/bin/python: Error while finding spec for 'my_project.folderA.folderB.call_me' (ImportError: No module named 'my_project')

项目和脚本在 Airflow 之外工作。在airflow中,它导入其他包,例如pandas和tensorflow,但不导入自定义包。我尝试使用 sys.path.insert 插入路径,但这不起作用。感谢您的阅读:)

最佳答案

您的 bash 命令在三个单独的 bash 运算符中运行。它应该在一个中运行。

call_script = BashOperator(
    task_id='call_script',
    bash_command='cd /home/user/my_project;'
                 'source /home/user/my_project/.venv/bin/activate;'
                 'python -m my_project.folderA.folderB.call_me',
    retries=2)

关于python - Airflow 无法导入自定义 python 包,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58993999/

相关文章:

python - 如何设置Airflow发送邮件?

java:仅在构建后出现包不存在错误

python - SWIG - python 中的 C++ 代码

python - SQLAlchemy ORM 从子查询中选择多个实体

Python - 使用 mailto 打开默认邮件客户端,有多个收件人

返回比 Linux `wc -l` 高得多的行数的 Python 代码

python - 如何从更深的目录中的文件导入模块?

python - pdflatex 在大量数字后挂起

json - Airflow |设置变量

祖 parent 任务的 Airflow trigger_rule