我尝试根据 timetsamp 检索偏移量,但是当我运行代码时,它在此处抛出空指针错误“LongeekOffset = outOffsets.get(partition).offset();”因为特定时间没有偏移量我的问题是如何获得该特定时间戳上最接近的偏移量
下面的代码我尝试过,
Long startTimestamp=Instant.now().minus (10, ChronoUnit.MINUTES ).toEpochMilli();
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (TopicPartition partition : partitions) {
timestampsToSearch.put(partition, startTimestamp);
}
Map<TopicPartition, OffsetAndTimestamp> outOffsets = consumer.offsetsForTimes(timestampsToSearch);
for (TopicPartition partition : partitions) {
Long seekOffset = outOffsets.get(partition).offset();
consumer.seek(partition, seekOffset);
任何帮助将不胜感激!提前致谢
最佳答案
offsetsForTimes 方法返回时间戳大于或等于目标时间戳的第一条消息的偏移量。如果为空则没有这样的消息。在这种情况下,只需将消费者定位到最后即可。
for (TopicPartition partition : partitions) {
OffsetAndTimestamp seekOffset = outOffsets.get(partition);
if(seekOffset!=null){
consumer.seek(partition, seekOffset.offset());
}else{
consumer.seekToEnd(Collections.singleton(partition));
}
}
关于java - 根据时间戳检索kafka主题中的偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56783876/