python - Airflow 在发生故障时使所有 dags 执行特定操作

标签 python etl decorator airflow

我们有很多 DAG 在 Airflow 上运行。当某些事情失败时,我们希望得到通知,或者做出特定的操作:我已经尝试通过装饰器

def on_failure_callback(f):
  @wraps(f)
  def wrap(*args, **kwargs):
    try:
      return f(*args, **kwargs)
    except Exception as e:
      return f"An exception {e} on ocurred on{f}" 
  return wrap

这可行,但有必要装饰我们想要具有此行为的任何函数。

我看到了this并尝试像这样实现它:

def on_failure_callback(context):
    operator = PythonOperator(
        python_callable=failure)

    return operator.execute(context=context)


def failure():
    return 'Failure in the failure func'


dag_args = {
    "retries": 2,
    "retry_delay": timedelta(minutes=2),
    'on_failure_callback': on_failure_callback
}

然后在 DAG 定义上,我使用 [...] default_args=dag_args [...],但此选项不起作用。

实现这一目标的最佳方法是什么?

谢谢

最佳答案

最简单的方法,IMO 是在 DAG 失败时将其定义为默认参数。

default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2015, 6, 1), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), }

如果您想根据任务依赖性指定发送电子邮件的行为,您还可以使用 sendgrid 运算符。 https://github.com/apache/airflow/blob/master/airflow/contrib/utils/sendgrid.py

关于python - Airflow 在发生故障时使所有 dags 执行特定操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56085372/

相关文章:

python - 在heroku dyno中从 Node 到python进行http调用

Postgresql:从 csv 文件复制,偶尔缺少列

etl - Airflow 保留相同的数据库连接吗?

c++装饰器模式,带模板的静态多态性和注册回调方法

python - 如何自动将python类中的 "register"方法作为列表类变量?

python - 为什么函数装饰器中的 **kwargs 值与函数中的值不同?

python - 如何使用 cocos python 查找图层宽度/高度

python - 如何使用 matplotlib 将 timedelta 绘制为值

python编码mysql :(

sql - 对 ETL 的良好 SQL Server Integration Services (SSIS) 示例/样本的建议?