python - Airflow 从与执行文件相同的指定路径中的文件导入时出现问题

标签 python airflow

为了有效地构建我的代码,我使用 sys.path += [ PATH_TO_MODULE ] 将函数导入到我的 DAG 中。

结构是这样的:

- MODULE
 |_run.py
 |_aux_functions.py
 |_config.py

aux_functions 中导入 config 时抛出错误。

aux_functions 导入到 run.py 中。

DAG 代码:

from airflow.operators.python_operator import PythonOperator
from airflow                           import DAG

from MODULE_PATHS import PATH_TO_MODULE
import datetime
import sys

sys.path += [ PATH_TO_MODULE ]

from run import run_function

default_args = { "start_date": datetime.datetime( 2018, 8, 20 ) }

with DAG( "run_dag_v1", default_args= default_args, schedule_interval= "0 0 * * *", ) as dag:
    task = PythonOperator( task_id= "run_function", python_callable= run_function, provide_context= True )

task

run.py 导入 aux_functions,它在运行期间从配置导入设置,但不起作用。显示的错误是: 导入错误:无法导入名称“设置”

# run.py
import aux_functions
.....
def run_function( **kwargs ):
    .....


# aux_functions.py
from config import settings
.....


# config.py
settings = { ..... }

当代码仅在 shell 中执行时,它可以正常工作,没有任何问题,但是当它通过 Airflow 运行时,它会不断显示此导入错误。

测试是通过以下方式完成的:airflow test run_dag_v1 run_function 2018-8-21

如果我将设置config移动到aux_functions,它工作正常,但为什么会发生这种情况以及如何避免它?

最佳答案

所以目录结构是这样的:

|_MODULE_1
|   |_ run.py
|   |_ aux_functions.py
|   |_ config.py
|   
|_MODULE_2
    |_ code.py
    |_ config.py

发生的情况是一个 DAG 正在导入 sys.path += [ PATH_MODULE_1 ]另一个 DAG 正在导入 sys.path += [ PATH_MODULE_2 ] .

在 Airflow 运行时,它会具有错误配置的路径,因此任何一个 DAG 总是失败。因此会出现导入错误,因为其他配置没有 settings目的。它们从来没有同时正确运行过。

解决方案只是确保在每种情况下,import config是相对的:import .config 。通过这样做,Airflow 现在可以识别正确的 config.py在每种情况下。

我发现管理此问题的最佳方法是导入父文件夹路径 from MODULE_PATHS import PATH_MODULES_DIRECTORY并添加 __init__.py文件到模块,以及在调用同一模块内的其他文件时仅使用相对导入。

所以新的 DAG 都具有相同的 sys.path所有模块都可以通过模块名称导入。

MODULES_DIRECTORY
    |
    |_MODULE_1
    |   |_ __init__.py
    |   |_ run.py
    |   |_ aux_functions.py
    |   |_ config.py
    |   
    |_MODULE_2
        |_ __init__.py
        |_ code.py
        |_ config.py


from airflow.operators.python_operator import PythonOperator
from airflow                           import DAG

from MODULE_PATHS import PATH_MODULES_DIRECTORY
import datetime
import sys

sys.path += [ PATH_MODULES_DIRECTORY ]

from MODULE_1.run import run_function

default_args = { "start_date": datetime.datetime( 2018, 8, 20 ) }

with DAG( "run_dag_v1", default_args= default_args, schedule_interval= "0 0 * * *", ) as dag:
    task = PythonOperator( task_id= "run_function", python_callable= run_function, provide_context= True )

task


# run.py
import .aux_functions
.....
def run_function( **kwargs ):
    .....


# aux_functions.py
from .config import settings
.....


# config.py
settings = { ..... }

关于python - Airflow 从与执行文件相同的指定路径中的文件导入时出现问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51977284/

相关文章:

python - Pyinstaller: 无法打开共享对象 libpython3.5m.so.1.0

python - 如何使用线性插值法对 numpy 数组进行插值

python - 失败的 Airflow DAG 任务是否可以使用更改的参数重试

python-3.x - 如何从使用DockerOperator执行的容器内部获取日志?

Airflow - 在同一个 DAG 中使用 TaskGroup 和 PythonBranchOperator

python - 如何更改相对导入搜索路径

python - 为什么 rst2html5 会搞乱编码?

Windows 上的 Python 和 vim?

python - Airflow,我如何使用 BashOperator 通过 python name.py 运行 .py 文件

postgresql - Airflow PostgresOperator 设置 ON_ERROR_STOP