java - Spring RabbitMQ - 在带有@RabbitListener 配置的服务上使用手动 channel 确认

标签 java rabbitmq spring-amqp spring-rabbit

如何在不使用自动确认的情况下手动确认消息。 有没有一种方法可以将其与 @RabbitListener@EnableRabbit 配置样式一起使用。 大多数文档告诉我们将 SimpleMessageListenerContainerChannelAwareMessageListener 一起使用。 然而,使用它我们失去了注释提供的灵 active 。 我已将我的服务配置如下:

@Service
public class EventReceiver {

@Autowired
private MessageSender messageSender;

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order) throws Exception {

  // code for processing order
}

我的 RabbitConfiguration 如下

@EnableRabbit
public class RabbitApplication implements RabbitListenerConfigurer {

public static void main(String[] args) {
    SpringApplication.run(RabbitApplication.class, args);
}

@Bean


public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new  MappingJackson2MessageConverter();
        return converter;
    @Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(rabbitConnectionFactory());
      factory.setMaxConcurrentConsumers(5);
      factory.setMessageConverter((MessageConverter) jackson2Converter());
      factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
      return factory;
    }

@Bean
public ConnectionFactory rabbitConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost("localhost");
    return connectionFactory;
}

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
    registrar.setContainerFactory(myRabbitListenerContainerFactory());
}

@Autowired
private EventReceiver receiver;
}
}

任何有关如何调整手动 channel 确认以及上述配置样式的帮助将不胜感激。 如果我们实现 ChannelAwareMessageListener,那么 onMessage 签名将会改变。 我们可以在服务上实现 ChannelAwareMessageListener 吗?

最佳答案

Channel 添加到@RabbitListener 方法...

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order, Channel channel,
    @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    ...
}

并使用basicAck中的标签,basicReject

编辑

@SpringBootApplication
@EnableRabbit
public class So38728668Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So38728668Application.class, args);
        context.getBean(RabbitTemplate.class).convertAndSend("", "so38728668", "foo");
        context.getBean(Listener.class).latch.await(60, TimeUnit.SECONDS);
        context.close();
    }

    @Bean
    public Queue so38728668() {
        return new Queue("so38728668");
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

    public static class Listener {

        private final CountDownLatch latch = new CountDownLatch(1);

        @RabbitListener(queues = "so38728668")
        public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
                throws IOException {
            System.out.println(payload);
            channel.basicAck(tag, false);
            latch.countDown();
        }

    }

}

应用程序属性:

spring.rabbitmq.listener.acknowledge-mode=manual

关于java - Spring RabbitMQ - 在带有@RabbitListener 配置的服务上使用手动 channel 确认,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38728668/

相关文章:

python - 如何确保消息被送达?

java - 从循环中返回值

Spring 集成 - channel 适配器与网关 (AMQP)

java - 通用数组创建与 Array.newInstance()

java - 在 Android 中设置视频作为背景播放

Eclipse kepler 的 Java 反编译器

java - 如何打破 for 循环相对于其他 for 循环的关系

solr - 如果我使用标准的 celeryconfig,pyramid_celery 有什么好处?

spring - 哪个版本的 Spring AMQP 和 Spring Rabbit 与 Spring 5 兼容?

java - 添加已发布的convertSendAndReceive确认