airflow - 如何在 Airflow 中查找失败的上游任务数?

标签 airflow

我很难弄清楚如何为同一天(同一执行日)运行两次的同一 dag 找到失败的任务。

考虑一个示例,当具有 dag_id=1 的 dag 在第一次运行时失败(由于任何原因,可以说连接超时)并且任务失败。当我们尝试查询时,TaskInstance 表将包含失败任务的条目。太棒了!!

但是,如果我重新运行相同的 dag(注意 dag_id 仍然为 1),那么在最后一个任务中(它具有 ALL_DONE 的规则,因此无论上游任务是否失败或成功它将被执行)我想计算当前 dag_run 中失败的任务数,忽略以前的 dag_runs。我遇到了 dag_run id,如果我们可以将它与 TaskInstance 相关联,这可能很有用,但我做不到。任何建议/帮助表示赞赏。

最佳答案

在 Airflow 1.10.x 中,可以通过更简单的代码实现相同的结果,避免直接接触 ORM。

from airflow.utils.state import State

def your_python_operator_callable(**context):    
    tis_dagrun = context['ti'].get_dagrun().get_task_instances()
    failed_count = sum([True if ti.state == State.FAILED else False for ti in tis_dagrun])
    print(f"There are {failed_count} failed tasks in this execution"

一个不幸的问题是 context['ti'].get_dagrun() 在从 CLI 运行单个任务的测试时不返回 DAGRun 实例。结果,该单个任务的手动测试将失败,但标准运行将按预期工作。

关于airflow - 如何在 Airflow 中查找失败的上游任务数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50613155/

相关文章:

Airflow 2 Helm Chart - 如何指定mysql连接字符串

airflow - 定义 Airflow DAG 任务排序的正确方法

airflow - 当源是 Oracle 时,通过 GUI 和增量更新更改 Airflow 计划

python - Airflow 通过 Snowflake session_parameters

python - 如何将 Airflow 与 Github 集成以运行脚本

airflow - Airflow 传感器中的模式 "reschedule"如何工作?

postgresql - 从外部连接 docker postgres (DBeaver)

带有 Airflow 的电子邮件

kubernetes - 将自定义 volumeMount 添加到 Airflow worker pod 中(使用 k8s Executor)

python-3.x - 在 apache Airflow 中初始化数据库时出错,下面是附加的错误,谢谢