rabbitmq - 在 RabbitMQ 中手动确认消息

标签 rabbitmq spring-amqp spring-rabbit

以前我正在读取队列中存在的所有消息,但现在我必须根据用户选择(计数)返回特定数量的消息。

我尝试相应地更改 for 循环,但由于自动确认,它会读取所有消息。所以我尝试在配置文件中将其更改为手动。

在我的程序中,如何在读取消息后手动确认消息(当前我使用 AmqpTemplate 来接收,并且没有 channel 引用)?

    Properties properties = admin.getQueueProperties("queue_name");
    if(null != properties)
    {
        Integer messageCount = Integer.parseInt(properties.get("QUEUE_MESSAGE_COUNT").toString());          
        while(messageCount > 0)
        {
            Message msg = amqpTemplate.receive(queue_name);
            String value = new String(msg.getBody());
            
            valueList.add(value);
            messageCount--;
        }
}

非常感谢任何帮助,提前致谢。

最佳答案

您无法使用 receive() 方法手动确认 - 使用 SimpleMessageListenerContainer 为事件驱动的消费者提供手动确认和 ChannelAwareMessageListener 。或者,使用模板的 execute() 方法,该方法使您可以访问 Channel - 但随后您将使用较低级别的 RabbitMQ API,而不是 Message 抽象。

编辑:

您需要学习底层 RabbitMQ Java API 才能使用执行,但类似这样的东西可以工作......

    final int messageCount = 3;
    boolean result = template.execute(new ChannelCallback<Boolean>() {

        @Override
        public Boolean doInRabbit(final Channel channel) throws Exception {
            int n = messageCount;
            channel.basicQos(messageCount); // prefetch
            long deliveryTag = 0;
            while (n > 0) {
                GetResponse result = channel.basicGet("si.test.queue", false);
                if (result != null) {
                    System.out.println(new String(result.getBody()));
                    deliveryTag = result.getEnvelope().getDeliveryTag();
                    n--;
                }
                else {
                    Thread.sleep(1000);
                }
            }
            if (deliveryTag > 0) {
                channel.basicAck(deliveryTag, true);
            }
            return true;
        }
    });

关于rabbitmq - 在 RabbitMQ 中手动确认消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29649545/

相关文章:

java - Spring-Boot/AMQP - 限制处理的消息数量

active-directory - 管理插件上的 RabbitMQ LDAP

java - 兔子 : Failed message being reprocessed in an infinite loop

linux - 获取 java.lang.noclassdefounderror : org. springframework/amqp/core/corelation

java - Spring消息TTL不起作用

java - 如何使用rabbitTemplate receiveAndReply

Rabbitmq:代理重启后未确认的消息不会消失

Rabbitmq beam.smp 进程无缘无故地利用了大部分内存大小

apache-kafka - 通过 MQ 传输音频流(可扩展性)

spring - Spring AMQP 相对于 RabbitMQ Java 客户端有什么优势吗?