python - 如何在 Python 脚本中触发 Airflow DAG 运行?

标签 python python-3.x airflow directed-acyclic-graphs

使用 apache Airflow ,我创建了一些 DAGS,其中一些不按计划运行。
我正在尝试找到一种方法,可以从 Python 脚本中触发特定 DAG 的运行。这可能吗?我能怎么做?

编辑 --- python 脚本将从与我所有 DAGS 所在的项目不同的项目中运行

最佳答案

在触发 Airflow DAG 运行时,您有多种选择。
使用 Python
Airflow python 包提供了一个 local client您可以用于从 python 脚本中触发 dag。例如:

from airflow.api.client.local_client import Client

c = Client(None, None)
c.trigger_dag(dag_id='test_dag_id', run_id='test_run_id', conf={})
使用 Airflow CLI
您可以使用 Airflow CLI 手动触发 Airflow 中的 dag。有关如何使用 CLI 触发 DAG 的更多信息,请访问 here .
使用 Airflow REST API
您还可以使用 Airflow REST api 来触发 DAG 运行。更多信息 here .

python 中的第一个选项可能最适合你(这也是我个人过去的做法)。但理论上你可以使用 subprocess从 python 或类似 requests 的库与 CLI 交互从 Python 中与 REST API 交互。

关于python - 如何在 Python 脚本中触发 Airflow DAG 运行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60055151/

相关文章:

python - np.from函数 : Reference other arrays

python - 操作系统错误 : exception: access violation reading with Cyphon basic example

python-3.x - 如何在 geopanda 中的两点之间应用一条线,例如2个城市之间

python-3.x - 当函数返回特定类型的对象或 None 时,指定什么类型提示?

google-cloud-platform - 如何使用部署在 GCP Cloud Composer 上的 Airflow Stable Rest API [Airflow version 2.0.0]

python - 值错误 : invalid literal for int() with base 10: '30.0' when running unittest

python - 将 NumPy 数组转换为列表时 float 不一致

python - 如何从 Python + SQLAlchemy 连接到高可用性 SQL Server

python - 我的推文的转发

java - 如何使用 docker-compose.yaml 在 Airflow 容器中安装 java