java - 根据时间戳检索kafka主题中的偏移量

标签 java apache-kafka kafka-consumer-api

我尝试根据 timetsamp 检索偏移量,但是当我运行代码时,它在此处抛出空指针错误“LongeekOffset = outOffsets.get(partition).offset();”因为特定时间没有偏移量我的问题是如何获得该特定时间戳上最接近的偏移量

下面的代码我尝试过,


Long startTimestamp=Instant.now().minus (10, ChronoUnit.MINUTES ).toEpochMilli();

Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
              for (TopicPartition partition : partitions) {
                timestampsToSearch.put(partition,  startTimestamp);
              }
Map<TopicPartition, OffsetAndTimestamp> outOffsets = consumer.offsetsForTimes(timestampsToSearch);
              for (TopicPartition partition : partitions) {
                Long seekOffset = outOffsets.get(partition).offset();
consumer.seek(partition, seekOffset); 

任何帮助将不胜感激!提前致谢

最佳答案

offsetsForTimes 方法返回时间戳大于或等于目标时间戳的第一条消息的偏移量。如果为空则没有这样的消息。在这种情况下,只需将消费者定位到最后即可。

for (TopicPartition partition : partitions) {
          OffsetAndTimestamp seekOffset = outOffsets.get(partition);
          if(seekOffset!=null){
             consumer.seek(partition, seekOffset.offset()); 
          }else{
             consumer.seekToEnd(Collections.singleton(partition));
          }
}

关于java - 根据时间戳检索kafka主题中的偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56783876/

相关文章:

java - JSP 打印主机名

java - Kafka Streams API 中的 ArrayList Serde 问题

apache-kafka - 由于消费者速度慢,Kafka 重新平衡主题中的数据

java - 在 Kafka 重新平衡中,撤销操作是否会等待消费进程完成?

Java 图形用户界面库

java - 错误 :Kotlin:Out-projected type 'JComboBox<*>' prohibits the use of '@BeanProperty public open fun setModel(p0: ComboBoxModel<E! >!)

spring-boot - KafkaTemplate 线程安全吗

java - Kafka - 如何同时使用 filter 和 filternot?

apache-spark - 如何在 Spark Streaming DirectAPI 中同时读取每个 Kafka 分区

java - 栏目主要顺序