python - 如何将参数传递给 Airflow on_success_callback 和 on_failure_callback

标签 python airflow airflow-scheduler

我已经使用 on_success_callback 和 on_failure_callback 实现了关于成功和失败的电子邮件警报。

根据 Airflow documentation ,

a context dictionary is passed as a single parameter to this function.



如何将另一个参数传递给这些回调方法?

这是我的代码
from airflow.utils.email import send_email_smtp

def task_success_alert(context):
    subject = "[Airflow] DAG {0} - Task {1}: Success".format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1]
        )
    html_content = """
    DAG: {0}<br>
    Task: {1}<br>
    Succeeded on: {2}
    """.format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1], 
        datetime.now()
        )
    send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)

def task_failure_alert(context):
    subject = "[Airflow] DAG {0} - Task {1}: Failed".format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1]
        )
    html_content = """
    DAG: {0}<br>
    Task: {1}<br>
    Failed on: {2}
    """.format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1], 
        datetime.now()
        )
    send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 13),
    'on_success_callback': task_success_alert,
    'on_failure_callback': task_failure_alert
}

我打算将回调移动到另一个包并将电子邮件地址作为参数传递。

最佳答案

您可以在 dag 中定义一个函数,该函数从您的包中调用该函数。在调用该函数时,将电子邮件作为参数传递。您可以在 DAG 级别进一步细化它以仅传递电子邮件所需的信息。

from package import outer_task_success_callback
email = 'xyz@example.com'

def task_success_alert(context):
    dag_id = context['dag'].dag_id
    task_id = context['task_instance']. task_id
    outer_task_success_callback(dag_id, task_id, email)
    
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 13),
    'on_success_callback': task_success_alert,
    'on_failure_callback': task_failure_alert
}
这将允许您在调用包中的函数之前进行自定义。
附带说明一下, Airflow 具有 smtp 电子邮件功能。您可以利用这些解决方案,而不是编写自己的解决方案。

关于python - 如何将参数传递给 Airflow on_success_callback 和 on_failure_callback,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56697838/

相关文章:

python - 如何仅使用客户 ID 链接 Google Adwords API 中的帐户?

airflow - 写入 Airflow 日志

airflow - 在 Airflow 中调度 dag 运行

airflow - 如果任务 1 失败,如何在运行时添加任务

python - 即使我没有给出 Airflow 的上游或下游,任务也会显示

python - 评论 : Built-In Functions

python - 在 Pandas 中使用 Groupby 对象并重新采样

Python Scikit-凝胶电泳数据的图像处理

Airflow 1.9.0 正在排队但任务未运行

airflow - 回填/清除旧 DAG 时 dagrun_timeout 是否会干扰?