我有一个应用程序,我在 Kafka 中发布了有关该主题的大量消息。目前,我通过配置 Kafka 监听器来一条一条地消费消息,如下所示:
@KafkaListener(id = "groupId", topics = "topic-name"})
public void consumeEvent(MyPojo myPojo) {
// Process the message one by one
}
上述在 Kafka 中处理消息的方式的问题是:上面的消费者代码中有机会同时处理大量事件。我正在寻找一种配置,可以帮助将一批消息集中在一起,例如一次 500 条消息。我们有什么办法可以使用 Spring Boot 来实现这一点吗?如果我们批量处理这样的消息,是否存在任何问题或我们可能需要处理/照顾的任何其他事情?
@KafkaListener(id = "groupId", topics = "topic-name"})
public void consumeEvents(List<MyPojo> myPojoItems) {
// Process the messages in bulk
}
最佳答案
我在我的项目中使用了@KafkaListener批处理属性。
请参阅https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/KafkaListener.html了解更多详情。
@KafkaListener(topics = TOPIC, groupId = GROUP_ID, batch = "true")
void messageListener(ConsumerRecords<String, Message> records) {
for (ConsumerRecord<String, Message> cr : records) {
Message message = cr.value();
}
}
关于java - 使用@KafkaListener在Kafka中批量消费消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76421584/