airflow - 异常如何传递给on_failure_callback?

标签 airflow

我想将异常传递给 on_failure_callback 以检查错误是什么。例如,如果某个 DAG 中包含“存在重复项”,则该函数将不会执行任何操作。否则,它将发送一封电子邮件。

但是,我看不到异常的格式。我在 Docker 中使用 Airflow 2.1.2,并作为我的 dag 定义如下:

with DAG(process_name,
         default_args=default_args,
         schedule_interval='@daily',
         max_active_runs=1,
         tags=['import', 'es'],
         on_failure_callback=known_error_dag
         ) as dag:
    operators

已尝试以下解决方案:

def known_error_dag(context):
    #  1
    ti = context['ti']
    ti.xcom_push(key='exception', value=context['exception'])

    #  2
    print(context['exception'])
    
    #  3
    logging.info(context['exception'])

我在 UI 和 docker 日志中都看不到异常。而且,它也没有出现在 XCOM 中。

这个问题的答案不清楚我想要的是否可能:Get Exception details on Airflow on_failure_callback context

然而,天文学家类(class)表明这确实是可能的。 https://academy.astronomer.io/astronomer-certification-apache-airflow-dag-authoring-preparation

最佳答案

您可以在 DAG 和任务级别定义 on_failure_callback。异常仅传递给任务级别的失败回调,因此可以在您的运算符上配置回调,或者通过 DAG 上的 default_args 向所有运算符配置回调:

with DAG(
    process_name,
    default_args=default_args,
    schedule_interval='@daily',
    max_active_runs=1,
    tags=['import', 'es'],
    default_args={
        "on_failure_callback": known_error_dag,
    },
) as dag:

在 DAG 级别定义的 on_failure_callback 也将采用上下文变量,其中包括一个键“reason”,但仅在出现以下情况时说明“task_failure” DAG 运行失败,因此在大多数情况下不是很有用。

关于airflow - 异常如何传递给on_failure_callback?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68903306/

相关文章:

airflow - 在循环中的任务之后运行 Airflow 任务,而不是在循环中的所有任务之后运行

python - 在 BranchPython Operator 之后跳过 Airflow 2.0 任务

python - 如何在 Airflow 中基于另一个 AWS-glue 任务成功完成在 Airflow 中启动 python operator boto3 AWS-glue 任务?

python - 在Python/Airflow中加密数据并在BigQuery中解密的方法

python - 了解 apache Airflow 中的 TreeView

parent-child - 清除 dag 内 Airflow 中的上游任务

python - Dataproc 配置单元运算符(operator)未运行存储在存储桶中的 hql 文件

kubernetes - k8和Google运营商的 Airflow :信誉验证

python - 如何更改 Airflow 网络服务器中的 dag_default_view?

kernel - 错误 : jupyter_client. kernelspec.NoSuchKernel:在作为 docker 容器运行的 Airflow/papermill 中没有出现名为 python3 的内核