我们有一个临时 Airflow DAG,任何人都可以触发它并从 50 人以上的团队中手动运行。
我们可以通过 dag id 检查触发 DAG 的 Airflow 审计日志,并且我们还可以在失败时收到电子邮件。
但我们更好奇的是,我们是否可以在 DAG 启动时或在每个任务运行开始时收到电子邮件,这将帮助我们了解和跟踪从 adhoc DAG 执行的事件和使用情况/命令。
最佳答案
@Khilesh Chauhan
有多种方法可以实现您的预期结果,排名不分先后:
<强>1。任务或 Dag 级别回调 Official Callback Reference
我们可以利用 on_success_callback
回调,它可以在两个不同的地方利用。
# use a function inside a specific PythonOperator, for task level control
task = PythonOperator(
task_id='your_task',
on_success_callback=send_mail,
)
# use it inside your DAG initiation
dag = DAG(
dag_id='your_task',
on_failure_callback=send_mail
)
我们可以编写一个示例 send_mail 函数,该函数利用 send_email 实用程序。
from airflow.utils.email import send_email
def send_mail(**context):
task = context['task_instance'].task
subject = f'Airflow task has successfully completed {task.task_id}'
body = f'Hi, this is an alert to let you know that your task {task.task_id} has completed successfully.'
send_email(
dag.default_args['email'],
subject,
body
)
<强>2。将 EmailOperator 添加到您的 DAG Official Email Operator Reference
您可以在 DAG 的开头添加一个 EmailOperator 任务。
from airflow.operators.email_operator import EmailOperator
email = EmailOperator(
task_id='alert_DAG_start',
to='<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="f38a9c8681b3969e929a9fdd909c9e96" rel="noreferrer noopener nofollow">[email protected]</a>',
subject='DAG Initiated - start {{ ds }}',
html_content=""" <h1>Some Content</h1> """
)
<强>3。创建一个使用执行 send_email 的 PythonOperator 的函数
您可能需要更多控制,例如包含日志信息等。因此您可能需要更多控制来使用 PythonOperator。
希望这可以帮助您解决问题。
更新
为了回答您获取用户名的第二个问题,我创建了一个函数供您使用。我们可以导入 session 上下文管理器,然后使用 .query
方法,出于调试目的,我循环遍历数组。您可以在索引 3 处看到用户名。
from airflow.models.log import Log
from airflow.utils.db import create_session
def return_user_name(**context):
"""
return the username for the executed tasks
"""
dag_id = context['task_instance'].dag_id
with create_session() as session:
result = session.query(Log.dttm, Log.dag_id, Log.execution_date, Log.owner, Log.extra).filter(Log.dag_id == dag_id, Log.event == 'trigger').first()
for index, result in enumerate(result):
print(index, result)
关于airflow - 如何触发 Airflow 任务运行的电子邮件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71590136/