我需要检查我的 dag 的所有任务是否都标记为成功,以便在 dag 的最后一个任务中它会向我发送一封电子邮件,通知我是否全部成功或是否失败。
这是我试过的一段代码:
dag_runs = DagRun.find(dag_id=self.dagId)
for dag_run in dag_runs:
if dag_run.state == 'success':
body = f'\nHello , \nHere is the values for the pipeline {self.dagId}: \ncount of lines is {new_lines}, \nMax date is {new_date}. \nRegards!'
else:
body= f'\nHello \nYour dag {self.dagId} has been Failed'
email_text = """\
Subject: %s
\nFrom: %s
\nTo: %s
\n%s
""" % (subject, sent_from, self.to, body)
try:
smtp_server = smtplib.SMTP_SSL('smtp.gmail.com', 465)
smtp_server.ehlo()
smtp_server.login(self.gmail_user, self.gmail_password)
smtp_server.sendmail(sent_from, self.to, email_text)
smtp_server.close()
print ("Email sent successfully!")
except Exception as ex:
print ("Something went wrong….",ex)
我无法检查 dag 状态是否成功。所以我想检查所有任务的状态是否成功
提前感谢您的帮助和建议。
最佳答案
我们有一个类似的用例,我们想要确定所有任务是否成功
。在 Airflow 中,如果任务失败并且我们有一个 trigger_rule
one_failed,DAG 可以运行最终被标记为成功,因为有从失败中恢复。
我们使用单个电子邮件实现的解决方案来跟踪所有 task_instances:
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
def check_all_success(**context):
dr: DagRun = context["dag_run"]
ti: TaskInstance = context["ti"]
# here we remove the task currently executing this logic
ti_summary = set([task.state for task in dr.get_task_instances() if task.task_id != ti.task_id])
# Remove success state
ti_summary.remove('success')
# If TI summary had any other state except success, there was an issue in the run
if ti:
# Send email: All tasks in DAG: {dr.dag_id} did not complete successfully
pass
else:
# Send email: All tasks in DAG: {dr.dag_id} completed successfully
pass
check_all_tasks = PythonOperator(
task_id='check_all_tasks',
python_callable=check_all_success,
provide_context=True
)
关于python-3.x - 我如何检查 airflow dag 中的所有任务是否成功?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70692446/