我想了解RabbitMQ中消息删除的逻辑。
我的目标是即使没有连接到读取它们的客户端也使消息持久化,以便当客户端重新连接时消息正在等待它们。我可以使用持久的惰性队列,以便将消息持久化到磁盘,并且我可以使用 HA 复制来确保多个节点获得所有排队消息的副本。
我希望使用主题或 header 路由将消息发送到两个或多个队列,并让一个或多个客户端读取每个队列。
我有两个队列,A 和 B,由 header 交换提供。队列 A 获取所有消息。队列 B 仅获取带有“归档” header 的消息。队列 A 有 3 个消费者正在阅读。队列 B 有 1 个消费者。如果 B 的消费者死了,但是 A 的消费者继续确认消息,RabbitMQ 会删除这些消息还是继续存储它们?在重新启动 B 之前,队列 B 不会有任何人使用它,我希望消息保持可用以供以后使用。
到目前为止,我已经阅读了大量文档,但仍然没有找到明确的答案。
最佳答案
RabbitMQ 将在确认后决定何时删除消息。
假设您有一个消息发送者:
var factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "guest", Password = "guest" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
这将创建一个持久队列“hello”并发送消息“Hello World!”到它。这就是向队列发送一条消息后队列的样子。
现在让我们设置两个消费者,一个确认收到消息,一个不确认。
channel.BasicConsume(queue: "hello",
autoAck: false,
consumer: consumer);
和
channel.BasicConsume(queue: "hello",
autoAck: true,
consumer: consumer);
如果你只运行第一个消费者,消息永远不会从队列中删除,因为消费者声明只有客户端手动确认消息才会从队列中消失:https://www.rabbitmq.com/confirms.html
然而,第二个消费者会告诉队列它可以安全地自动/立即删除它收到的所有消息。
如果您不想自动删除这些消息,则必须禁用 autoAck 并使用文档进行一些手动确认:
http://codingvision.net/tips-and-tricks/c-send-data-between-processes-w-memory-mapped-file (向下滚动到“手动确认”)。
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
关于rabbitmq - RabbitMQ 如何决定何时删除消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48816530/