python - 如何获取 Airflow dag 运行的 JobID?

标签 python pyspark airflow apache-airflow

当我们执行 dagrun 时,在 Airflow UI 上的“图 TableView ”中,我们会获得每个作业运行的详细信息。

JobID 类似于“scheduled__2017-04-11T10:47:00”

我需要这个 JobID 来跟踪和创建日志,我在其中维护每个任务/dagrun 花费的时间。

所以我的问题是如何在正在运行的同一个 dag 中获取 JobID

谢谢,切坦

最佳答案

这个值实际上叫做run_id,可以通过上下文或宏访问。

在 python 运算符中,这是通过上下文访问的,而在 bash 运算符中,这是通过 bash_command 字段上的 jinja 模板访问的。

有关宏中可用内容的更多信息:

https://airflow.apache.org/docs/stable/macros.html

关于神社的更多信息:

https://airflow.apache.org/docs/stable/concepts.html#jinja-templating

from airflow.models import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator


dag = DAG(
    dag_id='run_id',
    schedule_interval=None,
    start_date=datetime(2017, 2, 26)
)

def my_func(**kwargs):
    context = kwargs
    print(context['dag_run'].run_id)

t1 = PythonOperator(
    task_id='python_run_id',
    python_callable=my_func,
    provide_context=True,
    dag=dag
    )

t2 = BashOperator(
    task_id='bash_run_id',
    bash_command='echo {{run_id}}',
    dag=dag)

t1.set_downstream(t2)

以这个dag为例,查看每个operator的log,应该可以看到log中打印了run_id

关于python - 如何获取 Airflow dag 运行的 JobID?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43345991/

相关文章:

airflow - Airflow 与 Kerberos 的集成如何工作?

python-3.x - 无法启动调度程序

python - 在 python 中动态更改迭代器是否安全?

python - 如何在pyspark中将管道分隔的文本文件转换为csv文件?

azure - maxRecordsPerFile 在 Azure Data Bricks 中不起作用

python - 使用Python的spark-on-k8s资源登台服务器

使用插件导入 DAG 时出现 Airflow 错误 - 只能在运算符(operator)之间设置关系

python - 如何创建一个将行移动负 1 的 DataFrame,包括上面的行可能不存在的时间?

python - 如何估计特定文档的查询的重要性?

python - 从数组中形成成对的非连续元素