我们将 Storm 与 Kafka 和 ZooKeeper 结合使用。我们遇到过这样一种情况,我们不得不删除一些主题并用不同的名称重新创建它们。我们的 Kafka spouts 保持不变,除了现在从新的主题名称中读取。然而,现在 spouts 在尝试读取新主题时使用旧主题分区的偏移量。因此,my-topic-name 分区 0 的尾部位置将为 500,但偏移量将类似于 10000。
有没有办法重新设置偏移位置使其与主题的尾部匹配?
最佳答案
有多个选项(因为 Storm 的 KafkaSpout
不提供任何 API 来定义起始偏移量)。
- 如果你想从日志的尾部消费你应该删除旧的偏移量
- 取决于你的 Kafka 版本
- (pre 0.9)你可以操纵 ZK(这有点棘手)
- (0.9+) 或者您尝试从主题
__consumer_offsets
中删除偏移量(这也很棘手,可能也会删除您想要保留的其他偏移量)
- 如果没有偏移量,您可以使用自动偏移量重置策略“最新”或“最大”(取决于您的 Kafka 版本)重新启动 spout
- 取决于你的 Kafka 版本
- 作为替代方案(我会推荐),您可以编写一个小型客户端应用程序,该应用程序使用
seek()
按照您需要的方式操作偏移量,并使用commit()
偏移量。此客户端必须使用与您KafkaSpout
相同的组 ID,并且必须订阅相同的主题。此外,您需要确保此客户端应用程序正在运行单个消费者组成员,以便分配所有分区。- 为此,您要么寻找日志的末尾并提交
- 或者你提交了一个无效的偏移量(比如-1)并依赖于“最新”或“最大”的自动偏移量重置配置(取决于你的 Kafka 版本)
对于 Kafka Streams,有一个“应用程序重置工具”可以做类似的事情来操纵提交的偏移量。如果你想了解一些细节,你可以阅读这篇博文http://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/
(免责声明:我是该帖子的作者,它是关于 Kafka Streams 的——尽管如此,底层的偏移操作思想是相同的)
关于java - 如何重置 Kafka 偏移量以匹配尾部位置?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40271847/