python - Celery 为每个失败的任务发送邮件

标签 python django django-celery celery-task

我在我的 Django 应用程序中使用 celery ,我设置了 celery 花来监控 celery 的任务。我有设置任务,当用户注册/提交/FP 等事件时,电子邮件会发送给用户。现在 Flower 向我提供了任务的详细信息及其状态。现在,对于每个失败的任务,我都希望将一封电子邮件发送到我的帐户,这样我就不必每天为失败的任务检查花。 我在我的settings.py文件中做了如下配置

CELERY_SEND_TASK_ERROR_EMAILS = True 

管理员

EMAIL_USE_TLS = True
EMAIL_HOST_USER = 'noreply@xyz.com'
EMAIL_HOST_PASSWORD = 'xyz123@'
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
SERVER_EMAIL = EMAIL_HOST_USER
EMAIL_HOST = 'xyz.abc.com'
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'

这些是“发件人”电子邮件地址的设置。 几天前,我的一位团队成员不小心更改了上述 email_host 的密码,忘记更新设置文件。 由于 SMTP 身份验证错误,任务失败并不是太晚了。

有没有办法解决这个问题,即使发生 SMTP 身份验证错误,我也会立即收到来自 celery 的电子邮件?我不太确定这一点。

是否有任何其他工具可以监控我的任务并针对每个失败的任务向我发送邮件。

最佳答案

我发现有一个表是为了维护数据库中的任务而创建的。所以我只是简单地创建了一个脚本,它会每小时检查表中过去一小时的记录是否有失败的任务,如果发现任何失败的任务,它会发送一封电子邮件。

脚本.py

#!venv/bin/python2

import os
from django.conf import settings

if __name__ == '__main__' and __package__ is None:
    os.sys.path.append(
        os.path.dirname(
            os.path.dirname(
                os.path.abspath(__file__))))

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "rest_apis.settings")

import django

django.setup()
from django.core.mail import EmailMessage
from djcelery.models import TaskMeta
from datetime import datetime, timedelta, time


USERS_TO_NOTIFY = ['ops@yopmail.com']
TIME_THRESHOLD_INTERVAL = 60

def send_email(email_subject_line, email_body):
    email = EmailMessage(email_subject_line,
                         email_body,
                         settings.EMAIL_HOST_USER,
                         USERS_TO_NOTIFY
                         )
    email.send()


def main():
    current_time = datetime.now()   # Get Current TimeStamp
    time_threshold = current_time - timedelta(minutes=TIME_THRESHOLD_INTERVAL) # Get 60 minutes past current time stamp
    celery_taskmeta_objects = TaskMeta.objects.filter(status="FAILURE", date_done__gte=time_threshold)

    email_body = "Below are the tasks which failed : "
    if celery_taskmeta_objects.exists():
        for celery_taskmeta in celery_taskmeta_objects:
            print celery_taskmeta.task_id
            email_body += "\n\ntask_id : %s" % celery_taskmeta.task_id
            email_body += "\nstatus : %s" % celery_taskmeta.status
            email_body += "\ndate : %s" % celery_taskmeta.date_done
            email_body += "\ntraceback :"
            email_body += "\n%s\n\n" % celery_taskmeta.traceback
        email_subject_line = '[URGENT] Celery task failure in last %s minutes' % (TIME_THRESHOLD_INTERVAL)
        send_email(email_subject_line, email_body)


main()

现在在一封电子邮件中,我也得到了完整的堆栈跟踪和任务的 ID。现在我的要求是每小时检查一次,所以我只是将脚本放在 crontab 中。现在您可以根据您的基本需要更改时间阈值并相应地工作。

关于python - Celery 为每个失败的任务发送邮件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36540515/

相关文章:

python - Graphene Django图像字段获取绝对路径

python - Django:类对象没有标准函数的属性

python - 如何使用 Python/Django 作为后端进行 ReactJS 应用程序的服务器端渲染?

python - Celeryd 时间约束错误

django - Django Celery : manage. py celeryd返回 “Unknown Command”

python - Python中的模块名称冲突,如何解决?

python - 如何将 PyLab 与 PySimpleGUI 一起嵌入?

Python Random.Choice 除了一个选项

python - python中是否有隐藏规则控制如何显示小数的精度

python - celery 任务分析