python - 如何使用 Python 检查 Celery/Supervisor 是否正在运行

标签 python flask celery supervisord

如果 celery 在机器(Ubuntu)上运行,如何用 Python 编写脚本输出?

我的用例。我有一个包含一些任务的简单 python 文件。我没有使用 Django 或 Flask。我使用主管来运行任务队列。例如,

tasks.py

from celery import Celery, task
app = Celery('tasks')
@app.task()
def add_together(a, b):
    return a + b

主管:

[program:celery_worker]
directory = /var/app/
command=celery -A tasks worker info

这一切都有效,我现在想要有一个页面来检查 celery/supervisor 进程是否正在运行。即这样的事情可能使用 Flask 允许我托管页面提供 200 状态允许我进行负载平衡。

例如...

check_status.py

from flask import Flask

app = Flask(__name__)

@app.route('/')
def status_check():

    #check supervisor is running
    if supervisor:
         return render_template('up.html')
    else:
        return render_template('down.html')

if __name__ == '__main__':
    app.run()

最佳答案

更新 09/2020:Jérôme 在此处更新了 Celery 4.3 的此答案:https://stackoverflow.com/a/57628025/1159735

您可以通过导入celery.bin.celery包通过代码运行celery status命令:

import celery
import celery.bin.base
import celery.bin.celery
import celery.platforms

app = celery.Celery('tasks', broker='redis://')

status = celery.bin.celery.CeleryCommand.commands['status']()
status.app = status.get_app()

def celery_is_up():
    try:
        status.run()
        return True
    except celery.bin.base.Error as e:
        if e.status == celery.platforms.EX_UNAVAILABLE:
            return False
        raise e

if __name__ == '__main__':
    if celery_is_up():
        print('Celery up!')
    else:
        print('Celery not responding...')

关于python - 如何使用 Python 检查 Celery/Supervisor 是否正在运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33446350/

相关文章:

python - 如何诊断 "ImportError: cannot import name"

python - Pandas ,重命名多索引列(数据顺序已更改)

python - SciPy 与 MKL 库的兼容性问题

python - Apache 网络服务器和 Flask 应用程序

python - Celery Beat Windows 简单示例(不适用于 Django)

python - 使用 python 和 Qt 创建一个准确的节拍器

python - 套接字错误 :[errno 99] cannot assign requested address : flask and python

python - 如何确定在 Tornado/Flask/Python 上上传 HTTP 请求需要多长时间?

python - 使用 Celery 时 Scrapy 蜘蛛不跟踪链接

django - Celery Queue 似乎没有注册我的任务