我有两个具有不同客户端 ID 和组 ID 的消费者。除了保留时间和最大分区之外,我的 Kafka 安装还包含默认配置。我环顾四周,看看其他人是否也遇到过同样的问题,但无法得出任何结果。
所以场景是这样的:
消费者A: 连接到Kafka,消费大约300万条需要消费的消息,然后闲置等待更多消息。
消费者B: 不同的客户端/组ID,连接到相同的Kafka主题,这会导致消费者A重复获得300万条消息,而消费者B也消费它们。
这两个使用者是两个完全不同的 Java 应用程序,具有不同的客户端和组 ID,运行在同一台计算机上。 Kafka 服务器在另一台计算机上。
这是 Kafka 中的正常行为吗?我完全不知所措。
这是我的消费者配置:
bootstrap.servers=192.168.110.109:9092
acks=all
max.block.ms=2000
retries=0
batch.size=16384
auto.commit.interval.ms=1000
linger.ms=0
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
block.on.buffer.full=true
enable.auto.commit=false
auto.offset.reset=none
session.timeout.ms=30000
zookeeper.session.timeout=100000
rebalance.backoff.ms=8000
group.id=consumerGroupA
zookeeper.connect=192.168.110.109:2181
poll.interval=100
我的消费者 B 的明显区别是 group.id=consumerGroupB
最佳答案
这是正确的行为。因为根据您的配置,您的消费者不会提交他们已读取的记录的偏移量!
当消费者读取一条记录时,它必须提交读取该记录,您可以通过设置enable.auto.commit=true
来确保消费者自动提交偏移量或手动提交每条记录。在这种情况下,我认为自动提交适合您。
关于java - 两个 Kafka 消费者互相造成奇怪的行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43279936/