我想在 DAG A 完成执行时清除 DAG B 中的任务。 A 和 B 都是预定 DAG。
是否有任何运算符
/方式来清除任务状态并以编程方式重新运行 DAG B?
我知道 CLI option和用于清除任务的 Web UI 选项。
最佳答案
我建议在这里远离 CLI!
与通过 BashOperator 和/或 CLI 模块相比,在引用对象时,dags/tasks 的 Airflow 功能可以更好地暴露。
向名为“clear_dag_b”的 dag A 添加一个 python 操作,它从 dags 文件夹(模块)导入 dag_b 并且:
from dags.dag_b import dag as dag_b
def clear_dag_b(**context):
exec_date = context[some date object, I forget the name]
dag_b.clear(start_date=exec_date, end_date=exec_date)
重要!如果您出于某种原因不匹配或重叠 dag_b 计划时间与 start_date/end_date,clear() 操作会错过 dag 处决。此示例假设 dag A 和 B 安排相同,并且您只想从 B 清除第 X 天,当 A 执行第 X
日在清除之前检查 dag_b 是否已经运行可能是有意义的:
dab_b_run = dag_b.get_dagrun(exec_date) # returns None or a dag_run object
关于airflow - 以编程方式清除 Airflow 任务实例的状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58180281/