java - 如何重置 Kafka 偏移量以匹配尾部位置?

标签 java apache-kafka apache-storm apache-zookeeper

我们将 Storm 与 Kafka 和 ZooKeeper 结合使用。我们遇到过这样一种情况,我们不得不删除一些主题并用不同的名称重新创建它们。我们的 Kafka spouts 保持不变,除了现在从新的主题名称中读取。然而,现在 spouts 在尝试读取新主题时使用旧主题分区的偏移量。因此,my-topic-name 分区 0 的尾部位置将为 500,但偏移量将类似于 10000。

有没有办法重新设置偏移位置使其与主题的尾部匹配?

最佳答案

有多个选项(因为 Storm 的 KafkaSpout 不提供任何 API 来定义起始偏移量)。

  1. 如果你想从日志的尾部消费你应该删除旧的偏移量
    • 取决于你的 Kafka 版本
      • (pre 0.9)你可以操纵 ZK(这有点棘手)
      • (0.9+) 或者您尝试从主题 __consumer_offsets 中删除偏移量(这也很棘手,可能也会删除您想要保留的其他偏移量)
    • 如果没有偏移量,您可以使用自动偏移量重置策略“最新”或“最大”(取决于您的 Kafka 版本)重新启动 spout
  2. 作为替代方案(我会推荐),您可以编写一个小型客户端应用程序,该应用程序使用 seek() 按照您需要的方式操作偏移量,并使用 commit() 偏移量。此客户端必须使用与您 KafkaSpout 相同的组 ID,并且必须订阅相同的主题。此外,您需要确保此客户端应用程序正在运行单个消费者组成员,以便分配所有分区。
    • 为此,您要么寻找日志的末尾并提交
    • 或者你提交了一个无效的偏移量(比如-1)并依赖于“最新”或“最大”的自动偏移量重置配置(取决于你的 Kafka 版本)

对于 Kafka Streams,有一个“应用程序重置工具”可以做类似的事情来操纵提交的偏移量。如果你想了解一些细节,你可以阅读这篇博文http://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/

(免责声明:我是该帖子的作者,它是关于 Kafka Streams 的——尽管如此,底层的偏移操作思想是相同的)

关于java - 如何重置 Kafka 偏移量以匹配尾部位置?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40271847/

相关文章:

java - 尝试使用instanceof比较来迭代Class[]

java - 在本地运行 Storm 拓扑时出错

java - Kafka 9 KafkaConsumer multiple 似乎没有并行处理消息

maven - NoSuchMethodError : DefaultKryoFactory - while trying to run a storm-starter project topology 错误

java - Storm : eventhub spout stops receiving messages

java - KafkaSpout 工作示例

绘制图像时 Java Headless 异常

java - 另一个 Ant + JUnit 类路径问题

java - 如何在 smack 中设置我的名字?

java - Zookeeper 属性仍然是 Kafka 消费者的一个要求吗?