TLDR;在消费者动态创建的主题交换和队列的上下文中,如何在没有消费者消费消息时重新传递消息/通知生产者?
我有以下组件:
我目前有一个 RabbitMQ 主题交换。
routing_key = file_category
向交换发送消息. 现在 - 这工作正常,但它仍然有一个主要问题。目前,如果发布者发送带有没有消费者绑定(bind)的路由键的消息,则该消息将丢失。这是因为即使消费者创建的队列是持久的,一旦消费者断开连接它就会被销毁,因为它是这个消费者唯一的 .
消费者代码(python):
channel.exchange_declare(exchange=exchange_name, type='topic', durable=True)
result = channel.queue_declare(exclusive = True, durable=True)
queue_name = result.method.queue
topics = [ "pictures.*", "videos.trending" ]
for topic in topics:
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic)
channel.basic_consume(my_handler, queue=queue_name)
channel.start_consuming()
在我的用例中,在这种情况下丢失消息是 Not Acceptable 。
尝试的解决方案
但是,“丢失”消息是可以接受的 如果通知生产者没有消费者收到消息 (在这种情况下,它可以稍后重新发送)。我发现必填字段可能会有所帮助,因为 AMQP 的规范指出:
This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method.
这确实有效 - 在制作人中,我可以注册
ReturnListener
:rabbitMq.confirmSelect();
rabbitMq.addReturnListener( (int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) -> {
log.info("A message was returned by the broker");
});
rabbitMq.basicPublish(exchangeName, "pictures.profile", true /* mandatory */, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBytes);
这将按预期打印
A message was returned by the broker
如果使用路由键发送消息,则没有消费者绑定(bind)到。现在,我还想知道消息何时被消费者正确接收。所以我尝试注册一个
ConfirmListener
还有:rabbitMq.addConfirmListener(new ConfirmListener() {
void handleAck(long deliveryTag, boolean multiple) throws IOException {
log.info("ACK message {}, multiple = ", deliveryTag, multiple);
}
void handleNack(long deliveryTag, boolean multiple) throws IOException {
log.info("NACK message {}, multiple = ", deliveryTag, multiple);
}
});
这里的问题是发送了 ACK 经纪人 ,而不是消费者本身。所以当生产者发送带有路由键 的消息时K :
参见文档:
For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack. The same is true for negative acknowledgements (basic.nack).
因此,虽然我的问题理论上可以使用它来解决,但它会使了解消息是否被正确使用的逻辑变得非常复杂(尤其是在多线程、数据库中的持久性等上下文中):
send a message
on receive ACK:
if no basic.return was received for this message
the message was correctly consumed
else
the message wasn't correctly consumed
on receive basic.return
the message wasn't correctly consumed
可能的其他解决方案
consumer1
创建 queue-consumer-1
这与 myExchange
的消息绑定(bind)拥有路由 key abcd
.我预见的问题是它意味着为每个消费者应用程序实例找到一个唯一标识符(例如它运行的机器的主机名)。 我很乐意就此提供一些意见 - 谢谢!
相关:
[编辑] 解决方案
如前所述,我最终实现了使用 basic.return 的东西。实际上实现起来并不那么棘手,你只需要确保你的方法产生消息和处理基本返回的方法是同步的(或者如果不在同一个类中则有一个共享锁),否则你最终会得到交错的执行流程会弄乱您的业务逻辑。
最佳答案
我相信an alternate exchange对于有关识别未路由消息的部分,将最适合您的用例。
Whenever an exchange with a configured AE cannot route a message to any queue, it publishes the message to the specified AE instead.
基本上在创建“主”交换时,您为其配置一个备用交换。
对于引用的备用交换,我倾向于使用扇出,然后创建一个绑定(bind)到它的队列 (notroutedq)。
这意味着任何未发布到至少一个绑定(bind)到“主”交换的队列的消息都将在 notroutedq 中结束
现在关于你的陈述:
because even if the queue created by the consumers is durable, it is destroyed as soon as the consumer disconnects since it is unique to this consumer.
似乎您已将队列配置为将自动删除设置为 true。
如果是这样,如果断开连接,如您所说,队列将被破坏,队列中仍然存在的消息将丢失,备用交换配置未涵盖的情况 .
从您的用例描述中不清楚您是否希望在某些情况下消息最终出现在多个队列中,似乎更像是每种预期处理类型都有一个队列(同时保持分组灵活)。如果队列拆分确实与处理类型有关,我看不到使用自动删除设置队列的好处,希望在您想要更改绑定(bind)时可能不必进行任何清理维护。
假设您可以使用持久队列,那么 dead letter exchange (将再次与扇出一起使用)绑定(bind)到 dlq 将覆盖丢失的情况。
关于java - 确保至少一个消费者收到在主题交换上发布的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42932933/