airflow - 以编程方式清除 Airflow 任务实例的状态

标签 airflow airflow-scheduler

我想在 DAG A 完成执行时清除 DAG B 中的任务。 A 和 B 都是预定 DAG。

是否有任何运算符/方式来清除任务状态并以编程方式重新运行 DAG B?


我知道 CLI option和用于清除任务的 Web UI 选项。

最佳答案

我建议在这里远离 CLI!

与通过 BashOperator 和/或 CLI 模块相比,在引用对象时,dags/tasks 的 Airflow 功能可以更好地暴露。

向名为“clear_dag_b”的 dag A 添加一个 python 操作,它从 dags 文件夹(模块)导入 dag_b 并且:

from dags.dag_b import dag as dag_b

def clear_dag_b(**context):
   exec_date = context[some date object, I forget the name]
   dag_b.clear(start_date=exec_date, end_date=exec_date) 

重要!如果您出于某种原因不匹配重叠 dag_b 计划时间与 start_date/end_date,clear() 操作会错过 dag 处决。此示例假设 dag AB 安排相同,并且您只想从 B 清除第 X 天,当 A 执行第 X

在清除之前检查 dag_b 是否已经运行可能是有意义的:

dab_b_run = dag_b.get_dagrun(exec_date) # returns None or a dag_run object

关于airflow - 以编程方式清除 Airflow 任务实例的状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58180281/

相关文章:

python - Airflow 回填和新的 dag 运行

Airflow - 无法从工作人员获取日志文件。 404 客户端错误 : NOT FOUND for url

airflow - Airflow 能否持续访问短期动态生成任务的元数据?

Airflow DAG 并行任务延迟/执行延迟 60 秒

airflow-scheduler - Airflow 远程文件传感器

restart - 如何在 Airflow 1.8 上出现故障时重新启动 dag?

python - 导入错误: No module named json

airflow - Airflow :PythonOperator:为什么要包含 'ds' arg?

python - Airflow Scheduler 为同一个 dag 创建 PID 以每次生成任务

owner - 我应该如何在 Airflow 中使用正确的所有者任务?