我正在尝试在 celery 中生成自定义应用程序指标,并将它们拉入 prometheus 中。我正在使用danihodovic/celery-exporter导出 celery 任务指标,这些都是开箱即用的。但是,我找不到生成自定义应用程序指标的方法(例如内部方法调用的计数、方法的延迟等)。我使用了标准 python prometheus client在 flask 应用程序中成功,但我无法使其在 celery 中工作。
我可以看到 celery 监控与 celery events 绑定(bind)在一起,但是发布自定义事件不会在 prometheus 导出器中显示任何新指标。
代码:
#!/usr/bin/env python3
import os
from celery import Celery
from celery.events import EventDispatcher
from celery.utils.log import get_task_logger
from kombu import Queue
from prometheus_client import Counter
QUEUE_INPUT = 'test_queue'
TASK_NAME = 'test_task'
QUEUE_ARGS = {'x-queue-mode': 'lazy'}
QUEUE_CONN_STR = os.environ['RMQ_MASTER_CONN_STR']
__LOG = get_task_logger(__name__)
# Define our consumer object
app = Celery(TASK_NAME)
# Define routes
app.conf.update({
'broker_url': "{0}".format(QUEUE_CONN_STR),
'task_routes': {"test_celery.test_task": {"queue": QUEUE_INPUT}},
'task_serializer': 'json',
'task_send_sent_event': True,
'worker_send_task_events': True,
'result_serializer': 'json',
"broker_pool_limit": 20,
"task_always_eager": False,
"result_expires": 2,
'worker_enable_remote_control': False,
'worker_prefetch_multiplier': 1
})
app.conf.task_queues = [
Queue(QUEUE_INPUT, queue_arguments=QUEUE_ARGS),
]
c = Counter('test_counter', 'Number of hits')
d = Counter('test_counter_1', 'Number of msgs')
dispatcher = EventDispatcher(app.connection())
print("Hi")
dispatcher.send("C_INIT")
c.inc()
@app.task(name="test_task", bind=True, max_retries=2)
def test_task(self, payload: dict):
d.inc()
dispatcher.send("START_PROCESSING")
print(payload)
dispatcher.send("END_PROCESSING")
最佳答案
我求助于使用 prometheus push gateway 。仍在寻找更好的 celery 指标框架。
关于python - 如何为 celery 和 prometheus 生成自定义应用程序指标/监控,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72540551/