rabbitmq - RabbitMQ 如何决定何时删除消息?

标签 rabbitmq high-availability queueing

我想了解RabbitMQ中消息删除的逻辑。

我的目标是即使没有连接到读取它们的客户端也使消息持久化,以便当客户端重新连接时消息正在等待它们。我可以使用持久的惰性队列,以便将消息持久化到磁盘,并且我可以使用 HA 复制来确保多个节点获得所有排队消息的副本。

我希望使用主题或 header 路由将消息发送到两个或多个队列,并让一个或多个客户端读取每个队列。

我有两个队列,A 和 B,由 header 交换提供。队列 A 获取所有消息。队列 B 仅获取带有“归档” header 的消息。队列 A 有 3 个消费者正在阅读。队列 B 有 1 个消费者。如果 B 的消费者死了,但是 A 的消费者继续确认消息,RabbitMQ 会删除这些消息还是继续存储它们?在重新启动 B 之前,队列 B 不会有任何人使用它,我希望消息保持可用以供以后使用。

到目前为止,我已经阅读了大量文档,但仍然没有找到明确的答案。

最佳答案

RabbitMQ 将在确认后决定何时删除消息。

假设您有一个消息发送者:

var factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "guest", Password = "guest" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.QueueDeclare(queue: "hello",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);

    string message = "Hello World!";
    var body = Encoding.UTF8.GetBytes(message);

    channel.BasicPublish(exchange: "",
                         routingKey: "hello",
                         basicProperties: null,
                         body: body);
    Console.WriteLine(" [x] Sent {0}", message);
}

这将创建一个持久队列“hello”并发送消息“Hello World!”到它。这就是向队列发送一条消息后队列的样子。

enter image description here

现在让我们设置两个消费者,一个确认收到消息,一个不确认。
channel.BasicConsume(queue: "hello",
                    autoAck: false,
                    consumer: consumer);


channel.BasicConsume(queue: "hello",
                    autoAck: true,
                    consumer: consumer);

如果你只运行第一个消费者,消息永远不会从队列中删除,因为消费者声明只有客户端手动确认消息才会从队列中消失:https://www.rabbitmq.com/confirms.html

然而,第二个消费者会告诉队列它可以安全地自动/立即删除它收到的所有消息。

如果您不想自动删除这些消息,则必须禁用 autoAck 并使用文档进行一些手动确认:

http://codingvision.net/tips-and-tricks/c-send-data-between-processes-w-memory-mapped-file (向下滚动到“手动确认”)。
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

关于rabbitmq - RabbitMQ 如何决定何时删除消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48816530/

相关文章:

algorithm - 插入优先队列。 MIT c编程开放课件

php - AMQP - 连接超时

java - 在 Qpid 上调用的 RabbitMQ Java API queuePurge() 方法返回消息计数为零的 PurgeOK 响应

kubernetes - Kubernetes 上的 Apache flink - 如果作业管理器崩溃,则恢复作业

mysql - 使用MySQL NDB集群实现负载均衡和HA

java - 如果出现问题并且我想重试,使用相同的 Executor 重新排队 Runnable 是否安全或明智?

c# - 从 Docker 容器内部使用 RabbitMQ

api - RabbitMQ HTTP API 请求 401 未经授权

java - 在Spring中为HighlyAvailableGraphDatabase(嵌入)配置Neo4j 1.9.3

windows - MSMQ 触发的 powershell - 触发但对转发的消息不执行任何操作