rabbitmq - 处理RabbitMQ中的死信

标签 rabbitmq messaging amqp dead-letter

TL;DR:一旦我修复了最初导致消息被拒绝的消费者代码,我需要将死信消息“重播”回它们的原始队列。

我已经为 RabbitMQ 配置了死信交换 (DLX),并成功地将被拒绝的消息路由到死信队列。但是现在我想查看死信队列中的消息并尝试决定如何处理每个消息。一旦有问题的消费者代码被修复,这些消息中的一些(许多?)应该重播(重新排队)到它们的原始队列(在“x-death” header 中可用)。但我实际上如何去做呢?我应该编写一个一次性程序来从死信队列中读取消息并允许我指定一个目标队列来发送它们吗?那么搜索死信队列呢?如果我知道一条消息(假设是用 JSON 编码的)具有我想要搜索和重播的特定属性,该怎么办?例如,我修复了一个缺陷,我知道现在可以成功处理带有 PacketId: 1234 的消息。我想我也可以为此编写一个一次性程序。

我当然不可能是第一个遇到这些问题的人,我想知道是否还有其他人已经解决了这些问题。对于这种事情,似乎应该有某种瑞士军刀。我在 Google 和 Stack Overflow 上进行了相当广泛的搜索,但并没有真正想出太多。我能找到的最接近的东西是铁锹,但这似乎不是适合这项工作的工具。

最佳答案

Should I write a one-off program that reads messages from the dead letter queue and allows me to specify a target queue to send them to?



一般来说,是的。

您可以设置延迟重试,使用 delay message exchange plugin 的组合将消息重新发送回原始队列。 .

但这只会在一个时间间隔内自动重试,并且您可能没有在重试发生之前解决问题。

在某些情况下,这是可以的 - 就像错误是由暂时不可用的外部资源引起的。

不过,就您而言,我相信您对创建应用程序来处理死信的想法是最好的方法,原因如下:
  • 您需要搜索消息,这是不可能的 RMQ
  • 这意味着您需要一个数据库来存储来自 DLX/队列的消息

  • 因为您是从 DLX/队列中提取消息,所以您需要确保从消息中获取所有 header 信息,以便在时机成熟时可以重新发布到正确的队列。

    I certainly can't be the first one to encounter these problems and I'm wondering if anyone else has already solved them.



    而你不是!

    这个问题有很多解决方案,都归结为您建议的解决方案。

    一些较大的“服务总线”实现内置了这种类型的功能。例如,我相信 NServiceBus(或它的 SaaS 版本)内置了这个功能 - 尽管我不是 100% 确定。

    如果您想进一步研究这一点,请搜索“毒药信息”一词 - 这通常是用于这种情况的术语。我通过快速搜索在谷歌上找到了一些东西,这可能会帮助你走下去:
  • http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2013-January/025019.html
  • https://web.archive.org/web/20170809194056/http://tafakari.co.ke/2014/07/rabbitmq-poison-messages/
  • https://web.archive.org/web/20170809170555/http://kjnilsson.github.io/blog/2014/01/30/spread-the-poison/

  • 希望有帮助!

    关于rabbitmq - 处理RabbitMQ中的死信,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36186578/

    相关文章:

    jms - RabbitMQ 与 Mule 相比如何

    rabbitmq - AMQP/RabbitMQ - 如何避免竞争条件

    java - 使用 RabbitMQ 实现 Saga 消息传递

    performance - ActiveMQ 5.10/QPid 0.28/AMQP 1.0性能问题

    rabbitmq - 'Publisher returns"在 Spring AMQP 中如何发生/工作?

    rabbitmq - 在 RabbitMQ 主题交换中路由与模式不匹配的消息

    java - 消耗掉所有 Kafka 主题然后立即断开连接?

    java - 在 Apache Pulsar 中授予命名空间权限

    c# - 当不再引用订阅者时,如何实现取消订阅的事件?

    node.js - Nodejs + amqp 悄然死去