airflow - 如何捕获 Airflow 中调用的 DAG 中传递的 --conf 参数

标签 airflow

我正在尝试从 REST API 运行 DAG 并向其传递一些参数。 DAG 应该能够捕获参数并使用它。问题是我能够从 REST API 触发 DAG,但 DAG 无法捕获传递的参数。有没有办法实现这个目标?

我从 REST API 触发 DAG,如下所示。它传递 --conf 中的参数

http://abcairflow.com:8090/admin/rest_api/api?api=trigger_dag\&dag_id=trigger_test_dag\&conf=%7B%22key%22%3A%2

如何捕获被调用的DAG中conf值中传递的值。据我所知,conf 应该采用 URL 编码的 JSON 格式数据。

DAG 代码:`

def run_this_func(**kwargs):
print(kwargs)

run_this = PythonOperator(
    task_id='run_this',
    python_callable=run_this_func,
    dag=dag
)`

最佳答案

传递的外部参数是dag_run对象的一部分。可以通过以下方式访问它们:

API Request

import requests

headers = {
    'Cache-Control': 'no-cache',
    'Content-Type': 'application/json',
}

data = '{"conf":"{\\"Key1\\":\\"Value1\\"}"}'

response = requests.post('http://localhost:8080/api/experimental/dags/<dag_id>/dag_runs', headers=headers, data=data)

DAG

 def run_this_func(**context):
    print("Received {} for key=message".format(context["dag_run"].conf))


run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, provide_context=True, dag=dag)

关于airflow - 如何捕获 Airflow 中调用的 DAG 中传递的 --conf 参数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53339912/

相关文章:

airflow - 使用受 IAP 保护的应用验证 Google Composer http 调用任务

python - 使用 Airflow 中的另一个 dag 触发外部 dag

python - python 正则表达式中的变量?在 Airflow 中打印报表?

python - Airflow scheduler 是否有可能在开始下一天之前先完成前一天的周期?

docker - 在 docker compose 中更改容器端口

python - 使用 Apache Airflow 执行包含 PySpark 代码的 Databricks Notebook

python - 在一个 DAG 中执行顺序和并发任务

python - Airflow BashOperator 收集返回码

python - 我可以 get() 或 xcom.pull() Airflow 脚本的 MAIN 部分中的变量(在 PythonOperator 之外)吗?

Airflow Dag id zhihu_qr_dag 仍在 DagBag 中。先删除DAG文件