我有一个用例,其中有一个消费者组正在消费消息。我想构建一个 API 来修改其偏移量。因此,当使用偏移量调用端点时,我必须更改消费者组的偏移量。我正在使用 SpringBoot,消费者是使用 Spring Kafka 构建的。 提前致谢。
最佳答案
这是通过 CLI 的解决方案:
列出该群组订阅的主题:
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --describe
记下“CURRENT-OFFSET”和“LOG-END-OFFSET”下的值。 “CURRENT-OFFSET”是该消费者组当前在每个分区中的偏移量。
重置主题的消费者偏移量(预览):
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest
这将打印重置的预期结果,但不会实际运行它。
重置主题的消费者偏移量(执行):
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute
这将执行重置并将指定主题的消费者组偏移量重置回 0。
重复1检查重置是否成功
关于java - 有没有办法通过 API 重置 Kafka 消费者组的偏移量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67122416/