java - Spring AMQP (Rabbit) 监听器在异常情况下进入循环

标签 java spring rabbitmq spring-amqp

@Bean
RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
    template.setMessageConverter(messageConverter);
    template.setExchange(amqpProperties.getRabbitMqTopicExchangeName());
    return template;
}

@Bean
@Conditional (OperationsCondition.class)
 SimpleMessageListenerContainer opsMessageListenerContainer() {
    return listenerContainer(amqpProperties.getRabbitMqOperationsQueue(), 
            amqpProperties.getInitialRabbitOperationsConsumerCount(), 
            amqpProperties.getMaximumRabbitOperationsConsumerCount(),
            opsReceiver());
}

@Bean
@Conditional (OperationsCondition.class)
OperationsListener opsReceiver() {
    return new OperationsListener();
}

private SimpleMessageListenerContainer listenerContainer(String queue,
        int initConsumers,int maxConsumers, MessageListener listener)
{
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(rabbitConnectionFactory());
    container.setQueueNames(queue);
    container.setMessageListener(listener);
    container.setConcurrentConsumers(initConsumers);
    container.setMaxConcurrentConsumers(maxConsumers);
    container.setMessageConverter(messageConverter);
    return container;
}

消息监听器是:

public class OperationsListener  implements MessageListener
{
    public static final Logger logger = Logger.getInstance(OperationsListener.class);

    @Autowired (required=true)
    private OperationsProcessor processor;
    @Autowired (required=true)
    private ObjectMapper objectMapper;

    public void onMessage(Message message)
    {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();        
        converter.setJsonObjectMapper(objectMapper);
        OperationsMessage request = (OperationsMessage)converter.fromMessage(message);
        processor.createMessage(request);
        //This is throwing a JPA database exception
        processor.createOperation(request);
    }
}

processor.createOperation() 由于数据库问题抛出异常。问题是消息监听器正在循环并且消息不断返回。

我的处理器类:

@Component
@Transactional (propagation = Propagation.REQUIRES_NEW) 
public class OperationsProcessor
{
...............

    public void createOperation(OperationsMessage message)
    {
            try
            {
                .............
                .............
                //this call throws exception.
                opsRepo.create(operation,null);
            }
            catch (Exception e)
            {
                logger.error(e);
            }

    }
}

opsRepo.create 抛出异常。尽管我发现了错误,但我希望 spring amqp 不会再次发送该消息。不确定为什么相同的消息不断返回。

编辑:

我想我找到了一些关于如何处理这个问题的建议。原因是 spring 在失败时请求事件,这是默认的性质。 找到一个有用的线程 herehere .

最佳答案

关于java - Spring AMQP (Rabbit) 监听器在异常情况下进入循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43009597/

相关文章:

mysql - 如果池中的连接在Spring中空闲超过一段时间,MySql是否有任何超时功能?

java - 如何在 Spring Boot 中将 Json 字节数组转换为 AMQP?

java - Spring AMQP Listener Container 中的并发是如何实现的?

java - 自定义 spring boot 自动配置未检测到 bean

docker - Docker 中 RabbitMQ 的端口转发失败

java - 使用连接池进行 Spring Boot 和数据库测试

java - 如何根据 localStorage 数据设置默认下拉值和选择

java - 在 Ubuntu 中使用 Java 创建一个文件夹和一个文件

java - 使用 Spring 注入(inject) TaskScheduler

java - 如何处理@Transactional中的提交和回滚?