java - 有没有办法通过 API 重置 Kafka 消费者组的偏移量?

标签 java spring spring-boot apache-kafka spring-kafka

我有一个用例,其中有一个消费者组正在消费消息。我想构建一个 API 来修改其偏移量。因此,当使用偏移量调用端点时,我必须更改消费者组的偏移量。我正在使用 SpringBoot,消费者是使用 Spring Kafka 构建的。 提前致谢。

最佳答案

这是通过 CLI 的解决方案:

列出该群组订阅的主题:

kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --describe

记下“CURRENT-OFFSET”和“LOG-END-OFFSET”下的值。 “CURRENT-OFFSET”是该消费者组当前在每个分区中的偏移量。

重置主题的消费者偏移量(预览):

kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest

这将打印重置的预期结果,但不会实际运行它。

重置主题的消费者偏移量(执行):

kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute

这将执行重置并将指定主题的消费者组偏移量重置回 0。

重复1检查重置是否成功

关于java - 有没有办法通过 API 重置 Kafka 消费者组的偏移量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67122416/

相关文章:

java - 如何在 Eclipse Juno 中禁用星号导入

java - Spring Boot 冷启动

java - 没有 apk 文件存在 PhoneGap 项目

java - 如何回滚微服务之间发生的事务?

spring - Ignite 和 Spring Boot

java - Kafka 8.2.2 动态主题删除第一个事件

java - 推土机+ Spring : MappingException: Dozer Bean Mapper is already initialized

java - 使用构造函数注入(inject)时如何避免循环依赖

java - Spring WebFlux WebClient - 如何解决 400 错误请求

java - 在 @Size 中使用 Spring @Value