python-3.x - 我如何检查 airflow dag 中的所有任务是否成功?

标签 python-3.x airflow

我需要检查我的 dag 的所有任务是否都标记为成功,以便在 dag 的最后一个任务中它会向我发送一封电子邮件,通知我是否全部成功或是否失败。

这是我试过的一段代码:

dag_runs = DagRun.find(dag_id=self.dagId)
for dag_run in dag_runs:
                
   if dag_run.state == 'success':
       body = f'\nHello , \nHere is the values for the pipeline {self.dagId}: \ncount of lines is {new_lines}, \nMax  date is {new_date}. \nRegards!'
   else: 
       body= f'\nHello  \nYour dag {self.dagId} has been Failed'   

email_text = """\
        Subject: %s    
        \nFrom: %s
        \nTo: %s
        
        \n%s
        """ % (subject, sent_from, self.to, body)

   
try:
   smtp_server = smtplib.SMTP_SSL('smtp.gmail.com', 465)
   smtp_server.ehlo()
   smtp_server.login(self.gmail_user, self.gmail_password)
   smtp_server.sendmail(sent_from, self.to, email_text)
   smtp_server.close()
   print ("Email sent successfully!")

except Exception as ex:
   print ("Something went wrong….",ex)

我无法检查 dag 状态是否成功。所以我想检查所有任务的状态是否成功

提前感谢您的帮助和建议。

最佳答案

我们有一个类似的用例,我们想要确定所有任务是否成功。在 Airflow 中,如果任务失败并且我们有一个 trigger_rule one_failed,DAG 可以运行最终被标记为成功,因为有从失败中恢复。

我们使用单个电子邮件实现的解决方案来跟踪所有 task_instances:

from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance

def check_all_success(**context):
    dr: DagRun = context["dag_run"]
    ti: TaskInstance = context["ti"]
    
    # here we remove the task currently executing this logic
    ti_summary = set([task.state for task in dr.get_task_instances() if task.task_id != ti.task_id])

    # Remove success state
    ti_summary.remove('success')

    # If TI summary had any other state except success, there was an issue in the run
    if ti:
        # Send email: All tasks in DAG: {dr.dag_id} did not complete successfully
        pass
    else:
        # Send email: All tasks in DAG: {dr.dag_id} completed successfully
        pass

check_all_tasks = PythonOperator(
    task_id='check_all_tasks',
    python_callable=check_all_success,
    provide_context=True
)

关于python-3.x - 我如何检查 airflow dag 中的所有任务是否成功?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70692446/

相关文章:

python-3.x - 在KubernetesPods上设置HTTP请求设置

Airflow 平行度

mysql - 是否有 Django 等效项可以在 WHERE 子句中编写带有函数的 SQL SELECT 语句?

python - Python 3 中 OrderedDict 的 move_to_end 操作的时间复杂度是多少?

python - 如何读取 pandas dataframe 中的 json 并将一列值更改为大写并保存 json 文件

循环中的 Airflow DAG 任务依赖关系

airflow - 带有Airflow的AWS Batch执行器

python - 在 Airflow 中从一台服务器到另一台服务器的文件传输

django - CSV 文件从缓冲区上传到 S3

python - pandas 的开发版本给出 importerror : C extension: 'hashtable' not built on python 3. 4 (anaconda)