python - 如何在airflow中设置多个Dag目录

标签 python airflow

我为不同的 python 项目设置了不同的 Airflow dags,即一个父 dags 文件夹 /vol/dags 带有基于不同 python 项目的 DAG 的子文件夹:/vol/dags/project1/project1.py,/vol/dags/project2/project2.py 其中 DAGS_FOLDER =/vol/dags

project1.py 例如从同一目录中的另一个 python 文件导入函数,即 /vol/dags/project1/mycalculator.py。但是当我启动 airflow 网络服务器时,我得到一个 ImportError:

/vol/dags/project1/$ airflow webserver -p 8080

INFO - Filling up the DagBag from /vol/dags/
ERROR - Failed to import: /vol/dags/project1/project1.py
Traceback (most recent call last):
  File "/Users/xxx/anaconda/lib/python2.7/site-packages/airflow/models.py", line 247, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/vol/dags/project1/project1.py", line 10, in <module>
    from mycalculator import *
ImportError: No module named mycalculator

我尝试像这样将 mycalculator.py 导入到 project1.py:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
from mycalculator import *

dag = DAG(
    dag_id='project1', default_args=args,
    schedule_interval="@once")

最佳答案

您可以使用 packaged dag为不同的项目设置不同的 dag 文件夹的概念。您只需要将每个项目的 zip 放在您的父 dag 文件夹中。

通过这种方式,您可以轻松地将 dags 与其依赖项结合起来,并且您的 dag 文件夹将整洁干净,因为它只包含每个项目的 zip。

您可以创建一个如下所示的 zip:

my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py

你的父 dag 文件夹看起来像这样:

project1.zip
project2.zip
my_dag3.py

关于python - 如何在airflow中设置多个Dag目录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43218813/

相关文章:

python - 根据带有 basemap 的字典值对一个国家/地区的状态进行着色

python - 基于 NLTK 的词干提取和词形还原

python - 没有xcom的任务之间的 Airflow 通信

airflow - 单个 Airflow dag 中可以安排多少个任务?

python - 为 GKE 上的节点配置 net.core.somaxconn

python - 如何在 python 代码中指定 “tab”?

airflow - 通过 UI 将参数传递给 Airflow 的作业

google-cloud-functions - 从 Cloud Function 的文件到达事件触发 Composer DAG 上的任务

python - BaseOperator.xcom_pull 中的上下文参数是什么

python - 如何在嵌套函数中评估外部范围的变量?