我正在使用 spring amqp 使用来自 rabbitmq 的消息。 我一次使用一条消息,它很慢,因为我将它保存到数据库中。所以每次都打开和关闭交易。
现在我已经设置了一个这样的消费者。
@RabbitListener(queues = "queuename")
public void receive(Message message) {
someservice.saveToDb(message);
}
但这真的很慢。我想在开始保存之前先消费一堆消息。然后我可以打开一个交易。保存 300,然后提交并加载下一批。
这样的东西行得通吗?
class MessageChannelTag {
Message message;
Channel channel;
long tag;
}
@Component
class ConsumerClass {
List<MessageChannelTag> messagesToSave = new ArrayList<>();
@RabbitListener(queues = "queuename")
public void receive(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
messagesToSave.add(new MessageChannelTag(message, channel, tag));
}
@Scheduled(fixedDelay=500)
public void saveMessagesToDb() {
List saveTheese = new ArrayList(messagesToSave);
messagesToSave.clear();
service.saveMessages(saveTheese);
for(MessageChannelTag messageChannelTag:messagesToSave) {
//In the service I could mark the rows if save succeded or not and
//then out here I could ack or nack..
messageChannelTag.getChannel().basicAck(messageChannelTag.getTag(), false);
}
}
}
或者,如果有更简单的解决方案,请告诉我。我更喜欢快速、简单和健壮 =)
最佳答案
“上游”生产者是否可以提供批量消息而不是单个消息,这可能也值得研究。
关于java - 如何在没有确认的情况下消费 100 条消息,然后工作然后确认它们?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55977102/