airflow - 如何触发 Airflow 任务运行的电子邮件

标签 airflow

我们有一个临时 Airflow DAG,任何人都可以触发它并从 50 人以上的团队中手动运行。

我们可以通过 dag id 检查触发 DAG 的 Airflow 审计日志,并且我们还可以在失败时收到电子邮件。

但我们更好奇的是,我们是否可以在 DAG 启动时或在每个任务运行开始时收到电子邮件,这将帮助我们了解和跟踪从 adhoc DAG 执行的事件和使用情况/命令。

最佳答案

@Khilesh Chauhan

有多种方法可以实现您的预​​期结果,排名不分先后:

<强>1。任务或 Dag 级别回调 Official Callback Reference

我们可以利用 on_success_callback 回调,它可以在两个不同的地方利用。

# use a function inside a specific PythonOperator, for task level control
task = PythonOperator(
  task_id='your_task',
  on_success_callback=send_mail,
)

# use it inside your DAG initiation 
dag = DAG(
    dag_id='your_task',
    on_failure_callback=send_mail
)

我们可以编写一个示例 send_mail 函数,该函数利用 send_email 实用程序。

from airflow.utils.email import send_email

def send_mail(**context):
  task = context['task_instance'].task

  subject = f'Airflow task has successfully completed {task.task_id}'
  body = f'Hi, this is an alert to let you know that your task {task.task_id} has completed successfully.'

  send_email(
    dag.default_args['email'],
    subject,
    body
  )

<强>2。将 EmailOperator 添加到您的 DAG Official Email Operator Reference

您可以在 DAG 的开头添加一个 EmailOperator 任务。

from airflow.operators.email_operator import EmailOperator

email = EmailOperator(
        task_id='alert_DAG_start',
        to='<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="f38a9c8681b3969e929a9fdd909c9e96" rel="noreferrer noopener nofollow">[email protected]</a>',
        subject='DAG Initiated - start {{ ds }}',
        html_content=""" <h1>Some Content</h1> """
)

<强>3。创建一个使用执行 send_email 的 PythonOperator 的函数

您可能需要更多控制,例如包含日志信息等。因此您可能需要更多控制来使用 PythonOperator。

希望这可以帮助您解决问题。

更新

为了回答您获取用户名的第二个问题,我创建了一个函数供您使用。我们可以导入 session 上下文管理器,然后使用 .query 方法,出于调试目的,我循环遍历数组。您可以在索引 3 处看到用户名。

from airflow.models.log import Log
from airflow.utils.db import create_session

def return_user_name(**context):
  """
  return the username for the executed tasks
  """

  dag_id = context['task_instance'].dag_id

  with create_session() as session:
   result = session.query(Log.dttm, Log.dag_id, Log.execution_date, Log.owner, Log.extra).filter(Log.dag_id == dag_id, Log.event == 'trigger').first()
   for index, result in enumerate(result):
     print(index, result)

关于airflow - 如何触发 Airflow 任务运行的电子邮件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71590136/

相关文章:

airflow - 使用DB动态生成 Airflow 任务

Airflow 基本身份验证隐藏管理菜单选项

python - 如何将 Apache Airflow Python 路径从 Python 2.7 更改为 3?

python - apache Airflow 调度程序未调度作业

python - 使用 Airflow 在运行时导出环境变量

python - Google Cloud Composer 安装依赖项的时间太长

pytest - Airflow - DAG 完整性测试 - sqlalchemy.exc.OperationalError : (sqlite3. OperationalError)没有这样的表:变量

python - Airflow 工作器 - 连接中断 : IncompleteRead(0 bytes read)

mysql - 如何在 MySqlOperator 中使用 airflow xcoms

amazon-s3 - S3_delete_objects_operator 不删除bucket中的文件