java - Spring + RabbitMQ Exponential Backoff with RetryTemplate 无响应

标签 java spring rabbitmq spring-amqp dead-letter

我正在从事一个 Spring 项目,并且正在尝试为 RabbitMQ 队列实现带有死信的指数退避。
在此过程中,我创建了一个死信队列和一个死信交换 (Fanout),并将原始队列的 x-dead-letter-exchange 参数设置为死信交换的名称,并创建了一个带有 ExponentialBackOffPolicy 的 RetryTemplate .
出于测试目的,我的消费者只是通过抛出异常来拒绝它收到的所有消息。

这是我的 RabbitMQConfiguration 类的样子:

@Configuration
@EnableAutoConfiguration
@PropertySource("file:${HOME}/common/config/wave-planning.properties")
public class RabbitMQConfiguration {

    private final static String QUEUE_NAME = "orderPlanQueue";

    private static final String EXCHANGE_NAME = "orderPlanExchange";

    private static final String DL_EXCHANGE_NAME = "deadLetterExchange";

    private static final String DL_QUEUE_NAME = "deadLetterQueue";

    @Value("${rabbitmq.host:localhost}")
    private String host;

    @Value("${rabbitmq.port:5672}")
    private int port;

    @Value("${rabbitmq.user:guest}")
    private String userName;

    @Value("${rabbitmq.password:guest}")
    private String password;

    @Value("${rabbitmq.initial_backoff_interval:1000}")
    private int INITIAL_INTERVAL_IN_MILLISECONDS;

    @Value("${rabbitmq.max_backoff_interval:10000}")
    private int MAX_INTERVAL_IN_MILLISECONDS;

    @Autowired
    OrderPlanService orderPlanService;

    @Bean
    Queue queue() {
        Map<String, Object> qargs = new HashMap<String, Object>();
        qargs.put("x-dead-letter-exchange", DL_EXCHANGE_NAME);
        return new Queue(QUEUE_NAME, false, false, false, qargs);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(EXCHANGE_NAME);
    }

    @Bean
    FanoutExchange deadLetterExchange() { return new FanoutExchange(DL_EXCHANGE_NAME); }

    @Bean
    Queue deadLetterQueue() { return new Queue(DL_QUEUE_NAME); }

    @Bean
    Binding deadLetterBinding(Queue deadLetterQueue, FanoutExchange deadLetterExchange) {
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean
    public MessageConverter Jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());

        RetryTemplate retry = new RetryTemplate();
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();

        policy.setInitialInterval(INITIAL_INTERVAL_IN_MILLISECONDS);
        policy.setMultiplier(2);
        policy.setMaxInterval(MAX_INTERVAL_IN_MILLISECONDS);

        retry.setBackOffPolicy(policy);
        template.setRetryTemplate(retry);

        template.setRoutingKey(QUEUE_NAME);
        template.setMessageConverter(Jackson2JsonMessageConverter());
        return template;
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setMessageConverter(Jackson2JsonMessageConverter());
        container.setQueueNames(QUEUE_NAME);
        container.setMessageListener(listenerAdapter);
        container.setDefaultRequeueRejected(false);
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter() {
        return new MessageListenerAdapter(orderPlanService, "consume");
    }
}

消费者的相关部分基本上是这样的:

@Service
@Transactional
public class BaseOrderPlanService implements OrderPlanService {

    ....

    @Override
    public void consume(Object object) {
        throw new IllegalArgumentException("Test");
    }
}

对于 Autowiring 的整数值,使用默认值。
在运行它时,我看到交换器和队列是按预期在 rabbitmq 上创建的,具有预期的绑定(bind)和相关参数。
但是,当我使用路由键“orderPlanQueue”将消息传递给 orderPlanExchange 时,它​​会导致无限循环,因为消息在队列中反复被拒绝和替换。
另一方面,如果 IllegalArgumentException 被替换为 AmqpRejectAndDontRequeueException,则在第一次拒绝尝试时,消息将被简单地扔进死信队列。

如果有人能指出我在这里可能做错了什么,即未应用重试策略,我将不胜感激。

编辑: 根据 Artem 的建议使用 StatefulRetryOperationsInterceptor 进行编码。

@Configuration
@EnableAutoConfiguration
@PropertySource("file:${HOME}/common/config/wave-planning.properties")
public class RabbitMQConfiguration {

    private final static String QUEUE_NAME = "orderPlanQueue";

    private static final String EXCHANGE_NAME = "orderPlanExchange";

    private static final String DL_EXCHANGE_NAME = "deadLetterExchange";

    private static final String DL_QUEUE_NAME = "deadLetterQueue";

    @Value("${rabbitmq.host:localhost}")
    private String host;

    @Value("${rabbitmq.port:5672}")
    private int port;

    @Value("${rabbitmq.user:guest}")
    private String userName;

    @Value("${rabbitmq.password:guest}")
    private String password;

    @Value("${rabbitmq.initial_backoff_interval:1000}")
    private int INITIAL_INTERVAL_IN_MILLISECONDS;

    @Value("${rabbitmq.max_backoff_interval:10000}")
    private int MAX_INTERVAL_IN_MILLISECONDS;

    @Autowired
    OrderPlanService orderPlanService;

    @Bean
    Queue queue() {
        Map<String, Object> qargs = new HashMap<String, Object>();
        qargs.put("x-dead-letter-exchange", DL_EXCHANGE_NAME);
        return new Queue(QUEUE_NAME, false, false, false, qargs);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(EXCHANGE_NAME);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);
    }

    @Bean
    FanoutExchange deadLetterExchange() { return new FanoutExchange(DL_EXCHANGE_NAME); }

    @Bean
    Queue deadLetterQueue() { return new Queue(DL_QUEUE_NAME); }

    @Bean
    Binding deadLetterBinding(Queue deadLetterQueue, FanoutExchange deadLetterExchange) {
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange);
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean
    public MessageConverter Jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());

        /*
        RetryTemplate retry = new RetryTemplate();
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();

        policy.setInitialInterval(INITIAL_INTERVAL_IN_MILLISECONDS);
        policy.setMultiplier(2);
        policy.setMaxInterval(MAX_INTERVAL_IN_MILLISECONDS);

        retry.setBackOffPolicy(policy);
        template.setRetryTemplate(retry);
        */

        template.setRoutingKey(QUEUE_NAME);
        template.setMessageConverter(Jackson2JsonMessageConverter());
        return template;
    }

    @Bean
    StatefulRetryOperationsInterceptor interceptor() {
        return RetryInterceptorBuilder.stateful()
                .maxAttempts(4)
                .backOffOptions(INITIAL_INTERVAL_IN_MILLISECONDS, 2, MAX_INTERVAL_IN_MILLISECONDS)
                .build();
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setMessageConverter(Jackson2JsonMessageConverter());
        container.setQueueNames(QUEUE_NAME);
        container.setMessageListener(listenerAdapter);
        container.setAdviceChain(new Advice[] {interceptor()});
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter() {
        return new MessageListenerAdapter(orderPlanService, "consume");
    }

}

最佳答案

RabbitTemplate 的重试策略与 DLQ/DLX 完全无关。这是针对消费者的。

看引用手册的区别here :

you can now configure the RabbitTemplate to use a RetryTemplate to help with handling problems with broker connectivity.

here :

To put a limit in the client on the number of re-deliveries, one choice is a StatefulRetryOperationsInterceptor in the advice chain of the listener.

因此,您必须重新考虑您的逻辑并将重试功能添加到 SimpleMessageListenerContainer 定义中。

关于java - Spring + RabbitMQ Exponential Backoff with RetryTemplate 无响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45019585/

相关文章:

java - Spring HandlerInterceptors 是如何实例化的?

java - 如何将值从 Thymeleaf 传递到 Ajax 调用

java - Spring JUnit4 手动/自动布线困境

jquery - 使用 Spring 和 Ajax 从 Post 方法接收 Rest Controller 中的参数

php - 在 RabbitMQ PHP 中设置消息优先级

java - JCombobox 列表背景颜色覆盖选定的背景项目颜色

Java EE 示例应用程序

java - 在 spring 集成中如何仅使用 java 注释配置消息网关,并确保网关看到回复

php - 如何使用 PHP 从 rabbitMq 队列中获取单个消息?

c - amqp_login 给出 "Argument list too long"错误?