java - Spring异步MessageListener用例发生业务异常时如何让RabbitMQ重试

标签 java spring rabbitmq

我正在运行一个 Spring AMQP 消息监听器。

public class ConsumerService implements MessageListener {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void onMessage(Message message) {
        try {
            testService.process(message); //This process method can throw Business Exception
        } catch (BusinessException e) {
           //Here we can just log the exception. How the retry attempt is made?
        } catch (Exception e) {
           //Here we can just log the exception.  How the retry attempt is made?
        }
    }
}

如您所见,在处理过程中可能会出现异常。由于 Catch block 中的特定错误,我想重试。我无法通过 onMessage 中的异常。 如何告诉 RabbitMQ 有异常并重试?

最佳答案

自从 onMessage()不允许抛出已检查的异常,您可以将异常包装在 RuntimeException 中并重新抛出它。

try {
    testService.process(message);
} catch (BusinessException e) {
    throw new RuntimeException(e);
}

但是请注意,这可能会导致邮件无限期地重新发送。以下是它的工作原理:

RabbitMQ 支持拒绝消息并要求代理重新排队。显示为 here .但是 RabbitMQ 本身并没有重试策略的机制,例如设置最大重试次数、延迟等。

使用 Spring AMQP 时,“requeue on reject”是默认选项。 Spring 的 SimpleMessageListenerContainer 当有未处理的异常时,默认情况下会这样做。所以在你的情况下,你只需要重新抛出捕获的异常。但是请注意,如果您无法处理消息并且总是抛出异常,则该异常将无限期地重新传递并导致无限循环。

您可以通过抛出 AmqpRejectAndDontRequeueException 来覆盖每条消息的此行为异常,在这种情况下消息不会被重新排队。

您还可以关闭 SimpleMessageListenerContainer 的“拒绝时重新排队”行为完全通过设置

container.setDefaultRequeueRejected(false) 

当一条消息被拒绝并且没有重新排队时,如果在 RabbitMQ 中设置了一个,它将丢失或转移到 DLQ。

如果您需要具有最大尝试、延迟等的重试策略,最简单的方法是设置 Spring “无状态”RetryOperationsInterceptor它将在线程内进行所有重试(使用 Thread.sleep() ),而不会在每次重试时拒绝消息(因此每次重试都不会返回 RabbitMQ)。当重试用尽时,默认情况下将记录一个警告并使用该消息。如果您想发送到 DLQ,您将需要 RepublishMessageRecoverer 或自定义 MessageRecoverer 拒绝消息而不重新排队(在后一种情况下,您还应该 setup 队列上的 RabbitMQ DLQ)。使用默认消息恢复器的示例:

container.setAdviceChain(new Advice[] {
        org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
                .stateless()
                .maxAttempts(5)
                .backOffOptions(1000, 2, 5000)
                .build()
});

这显然有一个缺点,即您将在整个重试期间占用线程。您还可以选择使用“有状态”RetryOperationsInterceptor ,它将在每次重试时将消息发送回 RabbitMQ,但延迟仍将使用 Thread.sleep() 实现在应用程序中,加上设置有状态的拦截器有点复杂。

因此,如果您想要延迟重试而不占用 Thread您将需要在 RabbitMQ 队列上使用 TTL 的更复杂的自定义解决方案。如果您不想要指数退避(因此每次重试时延迟不会增加),它会更简单一些。要实现这样的解决方案,您基本上可以在 rabbitMQ 上创建另一个带有参数的队列:"x-message-ttl": <delay time in milliseconds>"x-dead-letter-exchange":"<name of the original queue>" .然后在你设置的主队列上"x-dead-letter-exchange":"<name of the queue with the TTL>" .所以现在当你拒绝并且不重新排队消息时,RabbitMQ 会将它重定向到第二个队列。当 TTL 过期时,它将被重定向到原始队列,从而重新传递给应用程序。所以现在你需要一个重试拦截器,它在每次失败后拒绝发送给 RabbitMQ 的消息,并跟踪重试计数。为了避免在应用程序中保持状态(因为如果您的应用程序是集群的,您需要复制状态),您可以从 x-death 计算重试次数。 RabbitMQ 设置的 header 。查看有关此 header 的更多信息 here .因此,此时实现自定义拦截器比使用这种行为自定义 Spring 有状态拦截器更容易。

同时检查 the section about retries in the Spring AMQP reference .

关于java - Spring异步MessageListener用例发生业务异常时如何让RabbitMQ重试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36979840/

相关文章:

java - 如何用JAVA输入输出对话框求三个数的平均值?

java - java中的序列化与反序列化

java - 奇怪的约会行为?

java - 在 JPA、Hibernate 中验证用户名长度的最佳方法

使用 Spring4 和 Thymeleaf 的 HTML 邮件

rabbitmq - AMQP/RabbitMQ - 如何避免竞争条件

java - 如何更有效地通过 http 下载大文件?

java - Spring 测试主线程陷入应用程序的无限循环

RabbitMQ - 相同的消息是否会通过不同的连接传递给消费者

java - spring amqp rabbitmq MessageListener 不工作