我想将异常传递给 on_failure_callback 以检查错误是什么。例如,如果某个 DAG 中包含“存在重复项”,则该函数将不会执行任何操作。否则,它将发送一封电子邮件。
但是,我看不到异常的格式。我在 Docker 中使用 Airflow 2.1.2,并作为我的 dag 定义如下:
with DAG(process_name,
default_args=default_args,
schedule_interval='@daily',
max_active_runs=1,
tags=['import', 'es'],
on_failure_callback=known_error_dag
) as dag:
operators
已尝试以下解决方案:
def known_error_dag(context):
# 1
ti = context['ti']
ti.xcom_push(key='exception', value=context['exception'])
# 2
print(context['exception'])
# 3
logging.info(context['exception'])
我在 UI 和 docker 日志中都看不到异常。而且,它也没有出现在 XCOM 中。
这个问题的答案不清楚我想要的是否可能:Get Exception details on Airflow on_failure_callback context
然而,天文学家类(class)表明这确实是可能的。 https://academy.astronomer.io/astronomer-certification-apache-airflow-dag-authoring-preparation
最佳答案
您可以在 DAG 和任务级别定义 on_failure_callback
。异常仅传递给任务级别的失败回调,因此可以在您的运算符上配置回调,或者通过 DAG 上的 default_args
向所有运算符配置回调:
with DAG(
process_name,
default_args=default_args,
schedule_interval='@daily',
max_active_runs=1,
tags=['import', 'es'],
default_args={
"on_failure_callback": known_error_dag,
},
) as dag:
在 DAG 级别定义的 on_failure_callback
也将采用上下文变量,其中包括一个键“reason
”,但仅在出现以下情况时说明“task_failure” DAG 运行失败,因此在大多数情况下不是很有用。
关于airflow - 异常如何传递给on_failure_callback?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68903306/