我在为新的 SO 文档项目记录 Kombu 时偶然发现了这个问题。
考虑以下昆布代码 Consumer Mixin :
from kombu import Connection, Queue
from kombu.mixins import ConsumerMixin
from kombu.exceptions import MessageStateError
import datetime
# Send a message to the 'test_queue' queue
with Connection('amqp://guest:guest@localhost:5672//') as conn:
with conn.SimpleQueue(name='test_queue') as queue:
queue.put('String message sent to the queue')
# Callback functions
def print_upper(body, message):
print body.upper()
message.ack()
def print_lower(body, message):
print body.lower()
message.ack()
# Attach the callback function to a queue consumer
class Worker(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [
Consumer(queues=Queue('test_queue'), callbacks=[print_even_characters, print_odd_characters]),
]
# Start the worker
with Connection('amqp://guest:guest@localhost:5672//') as conn:
worker = Worker(conn)
worker.run()
代码失败并显示:
kombu.exceptions.MessageStateError: Message already acknowledged with state: ACK
因为消息在 print_even_characters()
和 print_odd_characters()
上被确认两次。
一个有效的简单解决方案是仅确认最后一个回调函数,但如果我想在其他队列或连接上使用相同的函数,它会破坏模块化。
如何确认发送到多个回调函数的排队 Kombu 消息?
最佳答案
解决方案
1 - 检查message.acknowledged
message.acknowledged
标志检查消息是否已被确认:
def print_upper(body, message):
print body.upper()
if not message.acknowledged:
message.ack()
def print_lower(body, message):
print body.lower()
if not message.acknowledged:
message.ack()
优点:可读、简短。
缺点:中断 Python EAFP idiom .
2 - 捕获异常
def print_upper(body, message):
print body.upper()
try:
message.ack()
except MessageStateError:
pass
def print_lower(body, message):
print body.lower()
try:
message.ack()
except MessageStateError:
pass
优点:可读,Pythonic。
缺点:有点长 - 每个回调有 4 行样板代码。
3 - 确认最后一个回调
该文档保证 callbacks are called in order 。因此,我们可以简单地 .ack()
仅最后一个回调:
def print_upper(body, message):
print body.upper()
def print_lower(body, message):
print body.lower()
message.ack()
优点:简短、可读、无样板代码。
缺点:非模块化:回调不能被其他队列使用,除非最后一个回调始终是最后一个。这种隐含的假设可能会破坏调用者代码。
这可以通过将回调函数移至 Worker
类中来解决。我们放弃了一些模块化——这些函数不会从外部调用——但获得了安全性和可读性。
摘要
1 和 2 之间的区别仅仅是风格问题。
如果执行顺序很重要,以及消息在成功完成所有回调之前是否不应被 ACK,则应选择解决方案 3。
如果消息始终被确认,即使一个或多个回调失败,也应选择 1 或 2。
请注意,还有其他可能的设计;这个答案指的是驻留在工作器外部的回调函数。
关于python - 海带,RabbitMQ : Ack message more than once in a consumer mixin,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39068790/