python - 当 prefetch_count == 1 时拒绝并重新排队 RabbitMQ 任务

标签 python rabbitmq pika

假设我有一个包含五个项目的队列:

(tail) E, D, C, B, A (head)

我从这个队列的头部消费消息,但决定该消息 A目前不适合处理。我 reject带有 requeue=True 的项目,队列变为:

(tail) A, E, D, C, B (head)

然后我消费 B , C , D , 和 E , ack每一个。现在队列只包含 A ,我不断消费和reject在永无止境的循环中一遍又一遍。如果是新的,非 A消息进来后,它几乎立即被消耗掉,然后进程恢复其尝试消耗 A 的循环.

我对 Twisted Consumer Example 稍作修改来执行此操作来自鼠兔文档:

import pika
from pika import exceptions
from pika.adapters import twisted_connection
from twisted.internet import defer, reactor, protocol,task


@defer.inlineCallbacks
def run(connection):

    channel = yield connection.channel()

    exchange = yield channel.exchange_declare(exchange='topic_link',type='topic')

    queue = yield channel.queue_declare(queue='hello', auto_delete=False, exclusive=False)

    yield channel.queue_bind(exchange='topic_link',queue='hello',routing_key='hello.world')

    #yield channel.basic_qos(prefetch_count=1)

    queue_object, consumer_tag = yield channel.basic_consume(queue='hello',no_ack=False)

    l = task.LoopingCall(read, queue_object)

    l.start(0.01)


@defer.inlineCallbacks
def read(queue_object):

    ch,method,properties,body = yield queue_object.get()

    print body

    if body == 'A':
        yield ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
    else:
        yield ch.basic_ack(delivery_tag=method.delivery_tag)


parameters = pika.ConnectionParameters()
cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters)
d = cc.connectTCP('hostname', 5672)
d.addCallback(lambda protocol: protocol.ready)
d.addCallback(run)
reactor.run()

问题:注意以下注释掉的行:

#yield channel.basic_qos(prefetch_count=1)

当我取消注释时,消费者收到消息 A , 它会在 reject 之后立即再次拾取它正在处理它,忽略可能在它后面的队列中等待的任何其他项目。它不会将被拒绝的项目放在队列的尾部,而是不断尝试,一遍又一遍,完全阻塞队列中的所有其他项目。

注释掉该行后,它可以正常工作(尽管速度稍慢)。如果该行存在并且 prefetch_count > 1 , 它也有效。关于将其设置为恰好 1 的一些事情触发此行为。

我在拒绝消息 A 时是否遗漏了一个步骤? ?或者 Pika 的预取系统从根本上与这种边缘情况不兼容?

最佳答案

如果您只有一个消费者,那么 RabbitMQ 除了向被拒绝的同一消费者发送消息外别无他法(无论如何:使用 basic.reject 或 basic.nack)。

当您设置 prefetch_count > 1 时那么您的消费者将收到您的循环消息以及循环旁边头部的新消息(从字面上看,您的循环消息将保留在头部)。

如果不小心N*M循环消息 prefetch_count <= N和消费者数量 <= M您将循环所有消息(这会导致 CPU 烧毁等),因此检查 rejected 可能是个不错的选择消息标志并在消息已重新传递时具有一些高级逻辑。

关于python - 当 prefetch_count == 1 时拒绝并重新排队 RabbitMQ 任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24333840/

相关文章:

python - 如何使用 PyCharm 在 Python 控制台中使用绿色 "Attach Debugger"按钮

python lxml 添加一个保留所有父树的子元素

rabbitmq - 确认第一条消息后发送第二条消息。 RabbitMQ 能保证顺序吗?

php - ACCESS_REFUSED - 使用身份验证机制 AMQPLAIN 拒绝登录

python - 是否可以在 Python 中将 RabbitMQ 直接回复功能与 Pika 生成器使用者一起使用?

python - 如何用python删除最后一行文本?

python eval ('import foo' )引发 SyntaxError

rabbitmq - RabbitMQ 客户端如何判断它何时失去与服务器的连接?

python - 使用 Pika 客户端轮询 RabbitMQ 消息

python - 如何重启消费者rabbitmq pika python