为了有效地构建我的代码,我使用 sys.path += [ PATH_TO_MODULE ]
将函数导入到我的 DAG 中。
结构是这样的:
- MODULE
|_run.py
|_aux_functions.py
|_config.py
在 aux_functions
中导入 config
时抛出错误。
aux_functions
导入到 run.py
中。
DAG 代码:
from airflow.operators.python_operator import PythonOperator
from airflow import DAG
from MODULE_PATHS import PATH_TO_MODULE
import datetime
import sys
sys.path += [ PATH_TO_MODULE ]
from run import run_function
default_args = { "start_date": datetime.datetime( 2018, 8, 20 ) }
with DAG( "run_dag_v1", default_args= default_args, schedule_interval= "0 0 * * *", ) as dag:
task = PythonOperator( task_id= "run_function", python_callable= run_function, provide_context= True )
task
run.py
导入 aux_functions
,它在运行期间从配置导入设置
,但不起作用。显示的错误是:
导入错误:无法导入名称“设置”
。
# run.py
import aux_functions
.....
def run_function( **kwargs ):
.....
# aux_functions.py
from config import settings
.....
# config.py
settings = { ..... }
当代码仅在 shell 中执行时,它可以正常工作,没有任何问题,但是当它通过 Airflow 运行时,它会不断显示此导入错误。
测试是通过以下方式完成的:airflow test run_dag_v1 run_function 2018-8-21
如果我将设置
从config
移动到aux_functions
,它工作正常,但为什么会发生这种情况以及如何避免它?
最佳答案
所以目录结构是这样的:
|_MODULE_1
| |_ run.py
| |_ aux_functions.py
| |_ config.py
|
|_MODULE_2
|_ code.py
|_ config.py
发生的情况是一个 DAG 正在导入 sys.path += [ PATH_MODULE_1 ]
另一个 DAG 正在导入 sys.path += [ PATH_MODULE_2 ]
.
在 Airflow 运行时,它会具有错误配置的路径,因此任何一个 DAG 总是失败。因此会出现导入错误,因为其他配置没有 settings
目的。它们从来没有同时正确运行过。
解决方案只是确保在每种情况下,import config
是相对的:import .config
。通过这样做,Airflow 现在可以识别正确的 config.py
在每种情况下。
我发现管理此问题的最佳方法是导入父文件夹路径 from MODULE_PATHS import PATH_MODULES_DIRECTORY
并添加 __init__.py
文件到模块,以及在调用同一模块内的其他文件时仅使用相对导入。
所以新的 DAG 都具有相同的 sys.path
所有模块都可以通过模块名称导入。
MODULES_DIRECTORY
|
|_MODULE_1
| |_ __init__.py
| |_ run.py
| |_ aux_functions.py
| |_ config.py
|
|_MODULE_2
|_ __init__.py
|_ code.py
|_ config.py
from airflow.operators.python_operator import PythonOperator
from airflow import DAG
from MODULE_PATHS import PATH_MODULES_DIRECTORY
import datetime
import sys
sys.path += [ PATH_MODULES_DIRECTORY ]
from MODULE_1.run import run_function
default_args = { "start_date": datetime.datetime( 2018, 8, 20 ) }
with DAG( "run_dag_v1", default_args= default_args, schedule_interval= "0 0 * * *", ) as dag:
task = PythonOperator( task_id= "run_function", python_callable= run_function, provide_context= True )
task
# run.py
import .aux_functions
.....
def run_function( **kwargs ):
.....
# aux_functions.py
from .config import settings
.....
# config.py
settings = { ..... }
关于python - Airflow 从与执行文件相同的指定路径中的文件导入时出现问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51977284/