rabbitmq - 如何读取 RabbitMQ 未确认消息/RabbitMQ 周期

标签 rabbitmq rabbitmqctl

我想读取 RabbitMQ 队列中未确认消息的负载或 messageId。这可能吗?

我想这样做的原因是我尝试使用 RabbitMQ 死信功能来构建一个循环以定期自动生成消息。 简而言之,创建两个队列 - 工作队列和延迟队列。

  1. 设置延迟队列中消息的TTL为需要周期性发送的时间频率。可以针对不同的工作目的使用不同的 TTL 的不同消息;
  2. 将消息放入延迟队列。当消息过期时,它会重新发布到工作队列中。消息可以在工作队列中停留任意长的时间,直到消费者开始使用它。
  3. 一个消费者获取消息并处理它。如果处理成功,消费者需要确认工作队列,然后将消息写回延迟队列;如果处理失败(例如,线程崩溃),则不进行确认。然后消息会自动重新出现在工作队列中。然后另一个消费者可以接手这项工作。当发送回延迟队列的消息再次过期时,它会被重新发布,然后被消费者重新消费......一个循环构建,工作负载分配。

我想确保循环中没有丢失或重复的消息,因为我不想丢失工作或同时重复工作。但是,重复消息发生的可能性很小。下面显示消费者首先将消息写回延迟队列,并确认工作队列。如果线程在下面两行之间崩溃,消息将在延迟队列中,Rabbit 将消息重新发布到工作队列中。最终在循环中出现重复的消息。

  channel.basicPublish(DELAY_EXCHANGE, "", null, message.getBytes());
  channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

为了防止上述情况,我想在上面两行之后添加一个 dog watch 逻辑:

  1. 检查循环中的消息总数(两个队列中的消息总数)是否等于我的预期数量(我预期的数量减去10);

  2. 如果数字不匹配,我想弄清楚哪个缺失或哪个重复,然后进行处理。我不关心这些消息的顺序,或者频率被打乱了,因为这是一个需要考虑的非常边缘的情况。我可以很容易地检索那些准备好的消息并重新排队。但问题是如何处理那些未确认的消息?

非常感谢您!

罗伊

最佳答案

无法从其他上下文中读取未确认的消息,因为原始消息已被消费并保持为未确认状态。

关于rabbitmq - 如何读取 RabbitMQ 未确认消息/RabbitMQ 周期,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26857845/

相关文章:

java - RabbitMQ:如何在 "subscription added"上创建监听器

rabbitmq - rabbitmqadmin - 无法连接 : [Errno -2] Name or service not known

python - 同时从 Rabbitmq 接收日志并运行您的 flask 应用程序

rabbitmq 列出所有虚拟主机上的队列

go - Go 中的 RabbitMQ 消费者

c# - 什么开源消息队列软件提供严格排序的持久性?

rabbitmq - 使用 MassTransit 时默认情况下发布商确认是否处于事件状态?

PHP fatal error : Class 'AMQPConnection' not found

java - 在 RabbitMQ 和 Spring 中是否可以选择创建只接受高优先级消息的消费者?

RabbitMQ:发布者确认中的奇怪行为