我很难弄清楚如何为同一天(同一执行日)运行两次的同一 dag 找到失败的任务。
考虑一个示例,当具有 dag_id=1
的 dag 在第一次运行时失败(由于任何原因,可以说连接超时)并且任务失败。当我们尝试查询时,TaskInstance 表将包含失败任务的条目。太棒了!!
但是,如果我重新运行相同的 dag(注意 dag_id 仍然为 1),那么在最后一个任务中(它具有 ALL_DONE
的规则,因此无论上游任务是否失败或成功它将被执行)我想计算当前 dag_run 中失败的任务数,忽略以前的 dag_runs。我遇到了 dag_run id,如果我们可以将它与 TaskInstance 相关联,这可能很有用,但我做不到。任何建议/帮助表示赞赏。
最佳答案
在 Airflow 1.10.x 中,可以通过更简单的代码实现相同的结果,避免直接接触 ORM。
from airflow.utils.state import State
def your_python_operator_callable(**context):
tis_dagrun = context['ti'].get_dagrun().get_task_instances()
failed_count = sum([True if ti.state == State.FAILED else False for ti in tis_dagrun])
print(f"There are {failed_count} failed tasks in this execution"
一个不幸的问题是 context['ti'].get_dagrun()
在从 CLI 运行单个任务的测试时不返回 DAGRun 实例。结果,该单个任务的手动测试将失败,但标准运行将按预期工作。
关于airflow - 如何在 Airflow 中查找失败的上游任务数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50613155/