java - 如何获取kafka偏移量数据,在时间戳上指定

标签 java apache-kafka kafka-consumer-api

当我尝试运行它时,我尝试根据时间戳从 Kafka 主题获取偏移量,但抛出空指针错误,

Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
              for (TopicPartition partition : partitions) {
                timestampsToSearch.put(partition,  startTimestamp);
              }
Map<TopicPartition, OffsetAndTimestamp> outOffsets = consumer.offsetsForTimes(timestampsToSearch);
              for (TopicPartition partition : partitions) {
                Long seekOffset = outOffsets.get(partition).offset();
consumer.seek(partition, seekOffset);

任何帮助将不胜感激。

最佳答案

要查找与时间戳相对应的偏移量,您需要使用 offsetsForTimes()方法。

例如,这将打印 mytopic 的分区 0 对应于 1 秒前的偏移量:

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);) {
    Map<TopicPartition, Long> timestamps = new HashMap<>();
    timestamps.put(new TopicPartition("mytopic", 0), System.currentTimeMillis()-1*1000);
    Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
    System.err.println(offsets);
}

这将显示如下内容:

{offset-test-0=(timestamp=1561469319192, leaderEpoch=0, offset=100131)}

关于java - 如何获取kafka偏移量数据,在时间戳上指定,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56754704/

相关文章:

spring - Kafka 消费者没有正确提交偏移量

java - 使用随机类的首选方式

java - 父类(super class)对象的子类引用

java - 如何将创建的对象传递给另一个Java类

java - 嵌入式 jetty 的多个实例

apache-kafka - Kafka 中的内部和外部通信

apache-kafka - Kafka 消费者 - 民意调查行为

docker - 无法使用Confluent docker部署Debezium mysql连接器

apache-kafka - 如何让 Kafka 消费者订阅新分区

apache-kafka - 为什么kafka集群中的单节点多代理不受欢迎?