java - 具有相同 GroupId 的多个 Kafka 监听器都接收消息

标签 java spring spring-boot apache-kafka spring-kafka

我在 Spring Boot 应用程序中配置了一个 kafka 监听器,如下所示:

@KafkaListener(topicPartitions = @TopicPartition(topic = 'data.all', partitions = { "0", "1", "2" }), groupId = "kms")
public void listen(ObjectNode message) throws JsonProcessingException {
    // Code to convert to json string and write to ElasticSearch
}

此应用程序部署到 3 台服务器并在 3 台服务器上运行,尽管所有服务器的组 ID 均为 kms,但它们都获得了消息的副本,这意味着我在 Elastic 中获得了 3 条相同的记录。当我在本地运行实例时,会写入 4 个副本。

通过检查写入发生前后主题上所有消息的计数,我确认生产者仅向该主题写入 1 条消息;它只增加 1。我怎样才能防止这种情况发生?

最佳答案

当您像这样手动分配分区时,您负责在实例之间分配分区。

出于分区分布的目的,该组被忽略,但如果需要,仍可用于跟踪偏移量。

您必须使用组管理并让 Kafka 为您分配分区,或者为每个实例手动分配分区。

不要使用topicPartitions,而是使用topics = "data.all"

关于java - 具有相同 GroupId 的多个 Kafka 监听器都接收消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56551368/

相关文章:

spring - 在 Spring Security 中实现每组授权的最佳方式

java - Firebase:在单个事务中更新多个对象

java - 如何访问java程序中触发的graphql查询?

node.js - org.springframework.web.multipart.MultipartException : Could not parse multipart servlet request. ..流意外结束

java - Spring 3.0.3 和 JMS 2.0 支持

java - 在 Spring Rest Controller 中执行的方法明显比在纯 java 项目中慢

java - Android 登录 Activity 后重定向

Java: "please wait"GlassPane 未正确加载

java - 即使在成功编译后运行包含另一个 Java 类的文件时也会出现 "Cannot find symbol error"

java - 带一个自增的Spring JPA复合PK