当我们执行 dagrun 时,在 Airflow UI 上的“图 TableView ”中,我们会获得每个作业运行的详细信息。
JobID 类似于“scheduled__2017-04-11T10:47:00”。
我需要这个 JobID 来跟踪和创建日志,我在其中维护每个任务/dagrun 花费的时间。
所以我的问题是如何在正在运行的同一个 dag 中获取 JobID。
谢谢,切坦
最佳答案
这个值实际上叫做run_id
,可以通过上下文或宏访问。
在 python 运算符中,这是通过上下文访问的,而在 bash 运算符中,这是通过 bash_command
字段上的 jinja 模板访问的。
有关宏中可用内容的更多信息:
https://airflow.apache.org/docs/stable/macros.html
关于神社的更多信息:
https://airflow.apache.org/docs/stable/concepts.html#jinja-templating
from airflow.models import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='run_id',
schedule_interval=None,
start_date=datetime(2017, 2, 26)
)
def my_func(**kwargs):
context = kwargs
print(context['dag_run'].run_id)
t1 = PythonOperator(
task_id='python_run_id',
python_callable=my_func,
provide_context=True,
dag=dag
)
t2 = BashOperator(
task_id='bash_run_id',
bash_command='echo {{run_id}}',
dag=dag)
t1.set_downstream(t2)
以这个dag为例,查看每个operator的log,应该可以看到log中打印了run_id
。
关于python - 如何获取 Airflow dag 运行的 JobID?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43345991/