我在 Spring Boot 应用程序中配置了一个 kafka 监听器,如下所示:
@KafkaListener(topicPartitions = @TopicPartition(topic = 'data.all', partitions = { "0", "1", "2" }), groupId = "kms")
public void listen(ObjectNode message) throws JsonProcessingException {
// Code to convert to json string and write to ElasticSearch
}
此应用程序部署到 3 台服务器并在 3 台服务器上运行,尽管所有服务器的组 ID 均为 kms
,但它们都获得了消息的副本,这意味着我在 Elastic 中获得了 3 条相同的记录。当我在本地运行实例时,会写入 4 个副本。
通过检查写入发生前后主题上所有消息的计数,我确认生产者仅向该主题写入 1 条消息;它只增加 1。我怎样才能防止这种情况发生?
最佳答案
当您像这样手动分配分区时,您负责在实例之间分配分区。
出于分区分布的目的,该组被忽略,但如果需要,仍可用于跟踪偏移量。
您必须使用组管理并让 Kafka 为您分配分区,或者为每个实例手动分配分区。
不要使用topicPartitions
,而是使用topics = "data.all"
关于java - 具有相同 GroupId 的多个 Kafka 监听器都接收消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56551368/