java - 未使用 Spring AMQP StatefulRetryOperationsInterceptor

标签 java spring rabbitmq spring-amqp spring-retry

我正在尝试将 spring amqp 配置为仅重试消息定义的次数。当前失败的消息,例如因为 DataIntegrityViolationException 被无限期地重新传送。

根据文档here我想出了以下配置

@Bean
    public StatefulRetryOperationsInterceptor statefulRetryOperationsInterceptor() {
        return RetryInterceptorBuilder.stateful()
                .backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
                .maxAttempts(3)
                .messageKeyGenerator(message -> UUID.randomUUID().toString())
                .build();
    } 

这似乎没有应用 - 消息仍在无限期地尝试。

感觉好像我在这里遗漏了什么。

这是我关于 AMQP 的剩余配置:

@Bean
    Queue testEventSubscriberQueue() {
        final boolean durable = true;
        return new Queue("testEventSubscriberQueue", durable);
    }

    @Bean
    Binding binding(TopicExchange topicExchange) {
        return BindingBuilder.bind(testEventSubscriberQueue()).to(topicExchange).with("payload.event-create");
    }

    @Bean
    SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(testEventSubscriberQueue().getName());
        container.setMessageListener(listenerAdapter);
        container.setChannelTransacted(true);
        return container;
    }


    @Bean
    MessageListenerAdapter listenerAdapter(MessageConverter messageConverter, SubscriberHandler subscriberHandler) {
        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(subscriberHandler);
        listenerAdapter.setMessageConverter(messageConverter);
        return listenerAdapter;
    }

    @Bean
    public MessageConverter messageConverter(ObjectMapper objectMapper) {
        final Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
        jsonMessageConverter.setJsonObjectMapper(objectMapper);
        DefaultClassMapper defaultClassMapper = new DefaultClassMapper();
        defaultClassMapper.setDefaultType(EventPayload.class);
        jsonMessageConverter.setClassMapper(defaultClassMapper);
        final ContentTypeDelegatingMessageConverter messageConverter = new ContentTypeDelegatingMessageConverter(jsonMessageConverter);
        messageConverter.addDelgate(MessageProperties.CONTENT_TYPE_JSON, jsonMessageConverter);
        return messageConverter;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter);
        //rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

@Bean
    public TopicExchange testExchange() {
        final boolean durable = true;
        final boolean autoDelete = false;
        return new TopicExchange(EXCHANGE_NAME, durable, autoDelete);
    }

我正在使用 spring-amqp 1.5.1.RELEASE。

感谢任何帮助。

最佳答案

您需要配置容器以将拦截器添加到其建议链中...

container.setAdviceChain(new Advice[] { statefulRetryOperationsInterceptor() });

关于java - 未使用 Spring AMQP StatefulRetryOperationsInterceptor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33611453/

相关文章:

java - 十进制值问题

java - Spring Boot 和日志记录

.net-core - 配置 MassTransit 以使用 WebApplicationFactory<Startup> 进行测试

eclipse - 在 Tomcat 上使用 Eclipse 使用 Rabbit MQ 消息

c# - 在 C# 中,如何处理当前队列中的所有 RabbitMQ 消息?

Java - 为什么不允许 Enum<E> 作为 Annotation 成员?

java - com.sun.xml.bind.v2.bytecode.ClassTailor.noOptimize 的用法和性能

java - 从java类中提取所有字段

java - 如何从 Servlet 中的 WebApplicationContext 检索通常由 @Value 连接的值?

mysql - javax.resource.ResourceException : IJ000453: Unable to get managed connection