python - 如何为 celery 和 prometheus 生成自定义应用程序指标/监控

标签 python python-3.x celery

我正在尝试在 celery 中生成自定义应用程序指标,并将它们拉入 prometheus 中。我正在使用danihodovic/celery-exporter导出 celery 任务指标,这些都是开箱即用的。但是,我找不到生成自定义应用程序指标的方法(例如内部方法调用的计数、方法的延迟等)。我使用了标准 python prometheus client在 flask 应用程序中成功,但我无法使其在 celery 中工作。

我可以看到 celery 监控与 celery events 绑定(bind)在一起,但是发布自定义事件不会在 prometheus 导出器中显示任何新指标。

代码:

#!/usr/bin/env python3
import os

from celery import Celery
from celery.events import EventDispatcher
from celery.utils.log import get_task_logger
from kombu import Queue
from prometheus_client import Counter

QUEUE_INPUT = 'test_queue'
TASK_NAME = 'test_task'
QUEUE_ARGS = {'x-queue-mode': 'lazy'}
QUEUE_CONN_STR = os.environ['RMQ_MASTER_CONN_STR']

__LOG = get_task_logger(__name__)

# Define our consumer object
app = Celery(TASK_NAME)

# Define routes
app.conf.update({
    'broker_url': "{0}".format(QUEUE_CONN_STR),
    'task_routes': {"test_celery.test_task": {"queue": QUEUE_INPUT}},
    'task_serializer': 'json',
    'task_send_sent_event': True,
    'worker_send_task_events': True,
    'result_serializer': 'json',
    "broker_pool_limit": 20,
    "task_always_eager": False,
    "result_expires": 2,
    'worker_enable_remote_control': False,
    'worker_prefetch_multiplier': 1
})

app.conf.task_queues = [
    Queue(QUEUE_INPUT, queue_arguments=QUEUE_ARGS),
]

c = Counter('test_counter', 'Number of hits')
d = Counter('test_counter_1', 'Number of msgs')
dispatcher = EventDispatcher(app.connection())
print("Hi")
dispatcher.send("C_INIT")
c.inc()


@app.task(name="test_task", bind=True, max_retries=2)
def test_task(self, payload: dict):
    d.inc()
    dispatcher.send("START_PROCESSING")
    print(payload)
    dispatcher.send("END_PROCESSING")
    

最佳答案

我求助于使用 prometheus push gateway 。仍在寻找更好的 celery 指标框架。

关于python - 如何为 celery 和 prometheus 生成自定义应用程序指标/监控,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72540551/

相关文章:

python - 将 python 字符串转换为不带逗号的整数

python - 带有 Pandas 数据框的 CountVectorizer

python - 根据顺序将数组拆分为同等权重的 block

python - 如何从模型中获取权重和偏差?

python - 如何识别包含多个单词的字符串

python - 生命计数器不断重置

django - 如何保持Celery在Django(drf)+ Redis + WSGI(EC2)中运行

c++ - 使用 Redis 从 C++ 触发 Celery 任务

python - 使用上下文管理器从 celery 的 SoftTimeLimitExceeded 中恢复

python - Saleor 是否支持小数订单行数量?