python - 海带,RabbitMQ : Ack message more than once in a consumer mixin

标签 python rabbitmq kombu message-ack

我在为新的 SO 文档项目记录 Kombu 时偶然发现了这个问题。

考虑以下昆布代码 Consumer Mixin :

from kombu import Connection, Queue
from kombu.mixins import ConsumerMixin
from kombu.exceptions import MessageStateError
import datetime

# Send a message to the 'test_queue' queue
with Connection('amqp://guest:guest@localhost:5672//') as conn:
    with conn.SimpleQueue(name='test_queue') as queue:
        queue.put('String message sent to the queue')


# Callback functions
def print_upper(body, message):
    print body.upper()
    message.ack()    

def print_lower(body, message):
    print body.lower()
    message.ack()


# Attach the callback function to a queue consumer 
class Worker(ConsumerMixin):
    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [
            Consumer(queues=Queue('test_queue'), callbacks=[print_even_characters, print_odd_characters]),
        ]

# Start the worker
with Connection('amqp://guest:guest@localhost:5672//') as conn:
    worker = Worker(conn)
    worker.run()

代码失败并显示:

kombu.exceptions.MessageStateError: Message already acknowledged with state: ACK

因为消息在 print_even_characters()print_odd_characters() 上被确认两次。

一个有效的简单解决方案是仅确认最后一个回调函数,但如果我想在其他队列或连接上使用相同的函数,它会破坏模块化。

如何确认发送到多个回调函数的排队 Kombu 消息?

最佳答案

解决方案

1 - 检查message.acknowledged

message.acknowledged 标志检查消息是否已被确认:

def print_upper(body, message):
    print body.upper()
    if not message.acknowledged: 
        message.ack()


def print_lower(body, message):
    print body.lower()
    if not message.acknowledged: 
        message.ack()

优点:可读、简短。

缺点:中断 Python EAFP idiom .

2 - 捕获异常

def print_upper(body, message):
    print body.upper()
    try:
        message.ack()
    except MessageStateError:
        pass


def print_lower(body, message):
    print body.lower()
    try:
        message.ack()
    except MessageStateError:
        pass

优点:可读,Pythonic。

缺点:有点长 - 每个回调有 4 行样板代码。

3 - 确认最后一个回调

该文档保证 callbacks are called in order 。因此,我们可以简单地 .ack() 仅最后一个回调:

def print_upper(body, message):
    print body.upper()


def print_lower(body, message):
    print body.lower()
    message.ack()

优点:简短、可读、无样板代码。

缺点:非模块化:回调不能被其他队列使用,除非最后一个回调始终是最后一个。这种隐含的假设可能会破坏调用者代码。

这可以通过将回调函数移至 Worker 类中来解决。我们放弃了一些模块化——这些函数不会从外部调用——但获得了安全性和可读性。

摘要

1 和 2 之间的区别仅仅是风格问题。

如果执行顺序很重要,以及消息在成功完成所有回调之前是否不应被 ACK,则应选择解决方案 3。

如果消息始终被确认,即使一个或多个回调失败,也应选择 1 或 2。

请注意,还有其他可能的设计;这个答案指的是驻留在工作器外部的回调函数。

关于python - 海带,RabbitMQ : Ack message more than once in a consumer mixin,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39068790/

相关文章:

python - 使用 C++ 流序列化数字会导致 Mac 上的 boost.python 崩溃

node.js - Rabbitmq 事件并发和同步

php - AMQP - 连接超时

Django&Celery-路由问题

python - 如何使用 Django-Scheduler 包?

python - 有什么方法可以从命令行读取仲裁者管理的活跃的 gunicorn 工作人员的数量?

redis - “排队”教程和文档?

python - celery :socket.error 超时

Python MySQLdb唯一记录,忽略错误

java - Spring RabbitMQ - xml配置 - 手动ack