java - 如何根据时间戳重播kafka主题中的消息?

标签 java apache-kafka kafka-consumer-api

我正在尝试根据时间戳重播 Kafka 主题数据。我能够从起始位置查找数据,但随后我需要检查每个分区中记录的结束时间戳。当我尝试将记录的时间戳与结束时间戳进行比较时,循环在第一个分区处中断。

for(TopicPartition partition : records.partitions() ) {
  for (ConsumerRecord<Long, String> record : records) {
    if (record.timestamp() >= endtimeStamp) {
      consumer.seektoend(partition);
      break;
    }
  }
}

最佳答案

您的循环将始终在第一个分区期间中断(如果至少有一条记录的 timestamp() >= endtimeStamp ),因为对于每个分区,您都会迭代所有记录。

for (ConsumerRecord<Long, String> record : records) {
    if (record.timestamp() >= endtimeStamp) {
      consumer.seektoend(partition);
      break;
    }
  }

==> 在这里,您将执行所有记录,无论分区是什么。您将一次又一次地调用同一分区的seekToEnd()

雅尼克

关于java - 如何根据时间戳重播kafka主题中的消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57363956/

相关文章:

java - 保留对 Locale 实例的静态引用好吗?

java - 代码以不同顺序执行,Jframe、Button 和 TextArea

apache-kafka - KSQL WARN警告: window end time was truncated to Long. MAX是什么意思?

apache-kafka - 尝试删除Kafka中的消费者组时出现GroupNotEmptyException

java - 卡夫卡控制台消费者。错误 无法建立与节点 0 的连接。代理可能不可用

java - Java Kafka Consumer 在多线程中的使用

java - Web 应用程序的浏览器问题

java - 更新sql java 不起作用

c - RDKafka 客户端无法访问 Kafka 代理

java - 如何处理 OffsetOutOfRangeException 错误?