假设我在 2 个 Kafka 分区中有一个主题(测试),并且有一个 2 个消费者组(X,Y),每个消费者组中的单个消费者正在消费主题。
现在我想知道同一分区中其他消费者组的偏移量。下面的伪代码将解释需求
*** Let's assume this is running in the context of consumer group X
TOPIC = “test”
// consumer for group x
Consumer<K, V> consumerX = new KafkaConsumer<>(consumerProperties);
consumerX.subscribe(TOPIC, new ReportOnRebalance(……..));
// Get the current assigned partition, could be null but keep searching
// until partition got assigned to the consumerX
Set<TopicPartition> topicPartition = consumerX.assignment();
// Get the last committed offset
offsetAndMetadataX = consumerX.committed(topicPartition)
// consumer for group y
Consumer<K, V> consumerY = new KafkaConsumer<>(consumerProperties);
// manually assign because I am interested in the offset for the
// partition consumerX is going to serve for
consumerY.assign(topicPartition)
// Get the last committed offset
offsetAndMetadataY = consumerY.committed(topicPartition)
// Do require logic with offsetAndMetadataC and offsetAndMetadataY
newOffset = foo(offsetAndMetadataX, offsetAndMetadataY)
// want to reset the offset for this consumerY and in this
// partition
consumerY.seek(topicPartition, bar(newOffset))
// Change offset for consumerX and starting polling for messages
consumerX.seek(topicPartition, newOffset)
while(...) {
consumerX.poll(..)
....
}
*** Now the same code will run in the context of consumer group Y, but the role will be reversed
consumerY.subscribe()
consumerX.assign()
...
consumerY.seek(topicPartition, bar(newOffset))
...
// Change offset for consumerY and starting polling for messages
consumerY.seek(topicPartition, newOffset)
while(...) {
consumerY.poll(..)
....
}
我不确定上面的逻辑是否有效。我不确定的部分是,当一个消费者组(X)在一台机器上订阅时,假设分配了分区(1),并且相同的消费者组(X)在另一台机器上进行分配,并且也将搜索作为分配的一部分到一定程度的抵消。我不知道这样行不行?
为什么我想这样做,想要了解分配和订阅的用法,我们还需要手动跳过处理另一个消费者组已经处理的少数偏移量或重新处理另一个消费者已经处理的旧偏移量
最佳答案
我没有尝试过您在这里描述的内容,但从官方文档来看,这似乎应该像您希望的那样工作:
此处突出显示的关键部分:
Manual partition assignment does not use group coordination, so consumer failures will not cause assigned partitions to be rebalanced. Each consumer acts independently even if it shares a groupId with another consumer. To avoid offset commit conflicts, you should usually ensure that the groupId is unique for each consumer instance.
看起来基本上,如果您开始手动将分区分配给消费者,所有动态重新平衡都会自动关闭。所以,你应该小心,但 Kafka 似乎确实允许你描述的场景。
关于java - Kafka 管理消费者组之间的偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51756961/