我们有很多 DAG 在 Airflow 上运行。当某些事情失败时,我们希望得到通知,或者做出特定的操作:我已经尝试通过装饰器
def on_failure_callback(f):
@wraps(f)
def wrap(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception as e:
return f"An exception {e} on ocurred on{f}"
return wrap
这可行,但有必要装饰我们想要具有此行为的任何函数。
我看到了this并尝试像这样实现它:
def on_failure_callback(context):
operator = PythonOperator(
python_callable=failure)
return operator.execute(context=context)
def failure():
return 'Failure in the failure func'
dag_args = {
"retries": 2,
"retry_delay": timedelta(minutes=2),
'on_failure_callback': on_failure_callback
}
然后在 DAG 定义上,我使用 [...] default_args=dag_args [...]
,但此选项不起作用。
实现这一目标的最佳方法是什么?
谢谢
最佳答案
最简单的方法,IMO 是在 DAG 失败时将其定义为默认参数。
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2015, 6, 1), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), }
如果您想根据任务依赖性指定发送电子邮件的行为,您还可以使用 sendgrid 运算符。 https://github.com/apache/airflow/blob/master/airflow/contrib/utils/sendgrid.py
关于python - Airflow 在发生故障时使所有 dags 执行特定操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56085372/