我在我的消费者内部抛出了一个 AmqpException。
我的期望是消息会以 FIFO 的顺序返回到队列中,并且会在将来的某个时候重新处理。
似乎 Spring AMQP 不会将消息释放回队列。而是尝试一遍又一遍地重新处理失败的消息。
这会阻止处理新到达的消息。被卡住的那些在 AMQP 控制台中永远显示为“解压缩”状态。
有什么想法吗?
最佳答案
这就是 rabbitmq/Spring AMQP 的工作方式;如果消息被拒绝(抛出任何异常),默认情况下该消息将重新排队并放回队列的头部,以便立即重试。
... reprocessed sometime in the future.
您必须适本地配置一些东西才能实现这一点。
首先,您必须告诉代理不要重新排队消息。这是通过设置
defaultRequeueRejected
来完成的在监听器容器上设置为 false(默认情况下为 true)。或者,您可以抛出 AmqpRejectAndDontRequeueException
它指示容器拒绝(而不是重新排队)单个消息。但这还没有结束。这样做只会导致被拒绝的消息被丢弃。
为避免这种情况,您必须设置一个 Dead Letter Exchange/Queue对于队列 - 被拒绝的消息然后被发送到 DLX/DLQ 而不是被丢弃。通常建议使用策略而不是队列参数。
最后,您可以设置消息在 DLQ 上的生存时间,以便在此时间之后,从队列中删除消息。如果您在 上设置了另一个合适的死信交换那个队列(DLQ),您可以使消息在时间到期后重新排队回到原始队列。
请注意,这仅适用于来自原始队列的拒绝交付;当该队列中的消息过期时,它将不起作用。
见 this answer以及其问题中的一些链接以获取更多详细信息。
您可以使用
x-death
的内容 header 来决定您是否应该在多次尝试后完全放弃(捕获异常并以某种方式处理坏消息;不要抛出异常,容器将确认消息)。
关于带有 spring amqp 的 rabbitmq - 出现 AmqpException 时消息卡住,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28357820/