java - 如何稍后重新使用被拒绝的消息,RabbitMQ

标签 java rabbitmq

有时由于一些外部问题,我需要通过 basic.reject 并设置 requeue = true 重新排队消息。

但我不需要立即消耗它,因为它可能会在短时间内再次失败。如果我不断地重新排队,这可能会导致无限循环和重新排队。

  1. 所以我需要稍后再吃它,比如说一分钟后,

  2. 我需要知道消息已重新排队多少次,以便我可以停止重新排队,而只是拒绝它以声明它无法使用。

PS:我使用的是Java客户端。

最佳答案

第 1 点有多种解决方案。

第一个是 Celery 选择的(一个可以使用 RabbitMQ 作为代理的 Python 生产者/消费者库)。在消息中,添加应执行任务的时间戳。当您的消费者收到消息时,不要确认它并检查其时间戳。一旦达到时间戳,工作人员就可以执行任务。 (请注意,工作人员可以继续执行其他任务而不是等待)
这种技术有一些缺点。您必须将每个 channel 的 QoS 增加到任意值。如果您的工作人员已经在处理长时间运行的任务,则延迟的任务将在第一个任务完成之前不会执行。

第二种技术仅适用于 RabbitMQ,而且更加优雅。它利用dead-letter exchangesMessages TTL 。您创建了一个不被任何人使用的新队列。该队列有一个死信交换,它将消息转发到消费者队列。当您想要延迟消息时,从消费者队列中确认它(或拒绝它而不重新排队),并将消息复制到死信队列中,其 TTL 等于您想要的延迟(例如一分钟后)。在(大致)TTL 结束时,延迟的消息将神奇地再次进入消费者队列,准备被消费。 RabbitMQ 团队还制作了Delayed Message Plugin (此插件被标记为实验性但相当稳定,并且只要用户意识到其局限性,就可能适合生产使用,并且在故障转移时的可扩展性和可靠性方面具有严重限制,因此您可以决定是否真的要在生产中使用它,或者如果您更喜欢坚持手动方式,每个队列仅限一个 TTL)。

第 2 点只需要在您的消息中放置一个计数器并在您的应用程序中处理它。您可以选择将此计数器放在标题中或直接放在正文中。

关于java - 如何稍后重新使用被拒绝的消息,RabbitMQ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46679275/

相关文章:

php - RabbitMqBundle 消费者退出时出现异常 "Error reading data. Received 0 instead of expected 1 byte"和 "Broken pipe or closed connection"

docker - 在 Docker 中使用 RabbitMQ 删除容器

websocket - 更改 RabbitMQ WebSocket MQTT 端点的路径?

java - 在两个线程和主程序之间共享一个对象

java - 使用 Apache POI 在 java 中读取 excel (xlsx) 文件(得到奇怪的数字)

java - JAXB/MOXy : Do not call XmlElementWrapper setter when element missing?

python - 如何使用 pyramid_sockjs 的消息传递?

windows - RabbitMq 和 "Fatal error: handshake failure - handshake_decode_error"

java - 如何在 XMLGregorianCalendar 中设置带有区域的时间戳

java - 找不到 hibernate.cfg.xml (该文件实际上不存在)。配置位于dispatcher-servlet中