rabbitmq - Celery/RabbitMQ - 找出 No Acks - 未确认的消息

标签 rabbitmq celery django-celery

我正在尝试找出如何获取有关未确认消息的信息。这些存储在哪里?在使用 celery 检查时,似乎一旦消息得到确认,它就会处理完毕,并且您可以跟踪状态。假设您有一个结果后端,那么您可以看到它的结果。但从你应用延迟的那一刻起,直到它被承认它处于黑洞中。

  1. noAcks 存储在哪里?
  2. 如何找出 noAcks 列表的“深度”?换句话说,有多少个任务以及我的任务在列表中的哪个位置。

虽然与我正在处理的问题并不完全相关。

from celery.app import app_or_default

app = app_or_default()
inspect = app.control.inspect()

# Now if I want "RECEIVED" jobs.. 
data = inspect.reserved()

# or "ACTIVE" jobs.. 
data = inspect.active()

# or "REVOKED" jobs.. 
data = inspect.revoked()

# or scheduled jobs.. (Assuming these are time based??)
data = inspect.scheduled()

# FILL ME IN FOR UNACK JOBS!!
# data = inspect.??

# This will never work for tasks that aren't in one of the above buckets..
pprint.pprint(inspect.query_task([tasks]))

我非常感谢您对此的建议和帮助。

最佳答案

它们是inspect.reserved()中的那些任务有 'acknowleged': False

from celery.app import app_or_default

app = app_or_default()
inspect = app.control.inspect()

# those that have been sent to a worker and are thus reserved
# from being sent to another worker, but may or may not be acknowledged as received by that worker
data = inspect.reserved()

{'celery.tasks': [{'acknowledged': False,
               'args': '[]',
               'delivery_info': {'exchange': 'tasks',
                                 'priority': None,
                                 'routing_key': 'celery'},
               'hostname': 'celery.tasks',
               'id': '527961d4-639f-4002-9dc6-7488dd8c8ad8',
               'kwargs': '{}',
               'name': 'globalapp.tasks.task_loop_tick',
               'time_start': None,
               'worker_pid': None},
              {'acknowledged': False,
               'args': '[]',
               'delivery_info': {'exchange': 'tasks',
                                 'priority': None,
                                 'routing_key': 'celery'},
               'hostname': 'celery.tasks',
               'id': '09d5b726-269e-48d0-8b0e-86472d795906',
               'kwargs': '{}',
               'name': 'globalapp.tasks.task_loop_tick',
               'time_start': None,
               'worker_pid': None},
              {'acknowledged': False,
               'args': '[]',
               'delivery_info': {'exchange': 'tasks',
                                 'priority': None,
                                 'routing_key': 'celery'},
               'hostname': 'celery.tasks',
               'id': 'de6d399e-1b37-455c-af63-a68078a9cf7c',
               'kwargs': '{}',
               'name': 'globalapp.tasks.task_loop_tick',
               'time_start': None,
               'worker_pid': None}],
 'fastlane.tasks': [],
 'images.tasks': [],
 'mailer.tasks': []}

关于rabbitmq - Celery/RabbitMQ - 找出 No Acks - 未确认的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23013249/

相关文章:

python - RabbitMQ:清除队列

java - 如何使Spring Integration AMQP队列事务解耦?

python - 如何在 python 中做一个简单的 Pika SelectConnection 来发送消息?

python-3.x - 类型错误:在运行基本 add.delay(1,2) 测试时无法 pickle 内存 View 对象

python - Celery:实例在一两周后变得缓慢

python - 安装具有依赖项的 python 包时如何在 PIP 上指定版本

python - Celery - 有顺序任务而不是并发?

django - 无法在 django 1.5.4 中迁移 djcelery

kubernetes - Rabbitmq-ha helm chart,管理插件抛出错误

python - 如何检查 Celery 中的任务状态?