java - 有没有办法从Kafka主题中获取最后一条消息?

标签 java apache-kafka spring-kafka

我有一个带有多个分区的 Kafka 主题,我想知道 Java 中是否有办法获取该主题的最后一条消息。我不关心我只想获取最新消息的分区。

我试过 @KafkaListener但它仅在主题更新时获取消息。如果在应用程序打开后没有发布任何内容,则不会返回任何内容。

也许听众根本就不是解决问题的正确方法?

最佳答案

以下代码段对我有用。你可以试试这个。评论中的解释。

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic));

        consumer.poll(Duration.ofSeconds(10));

        consumer.assignment().forEach(System.out::println);

        AtomicLong maxTimestamp = new AtomicLong();
        AtomicReference<ConsumerRecord<String, String>> latestRecord = new AtomicReference<>();

        // get the last offsets for each partition
        consumer.endOffsets(consumer.assignment()).forEach((topicPartition, offset) -> {
            System.out.println("offset: "+offset);

            // seek to the last offset of each partition
            consumer.seek(topicPartition, (offset==0) ? offset:offset - 1);

            // poll to get the last record in each partition
            consumer.poll(Duration.ofSeconds(10)).forEach(record -> {

                // the latest record in the 'topic' is the one with the highest timestamp
                if (record.timestamp() > maxTimestamp.get()) {
                    maxTimestamp.set(record.timestamp());
                    latestRecord.set(record);
                }
            });
        });
        System.out.println(latestRecord.get());

关于java - 有没有办法从Kafka主题中获取最后一条消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57722688/

相关文章:

apache-kafka - Kafka 流与 Kafka 消费者如何决定使用什么

scala - Kafka - 为什么在将 AUTO_OFFSET_RESET_CONFIG 设置为 "latest"时,新的 groupId 不返回主题中的所有消息

java - 这是 Spring-Kafka 文档中有关 BatchErrorHandler 的错误吗?

java - 无法构建cordova项目

java - 生成随机测试用例

elasticsearch - 从Kafka流到Elasticsearch时的主题映射

java - 使用 Spring-Kafka-2.3.0 及更高版本使用主题中的消息并以指数退避重试,直到成功为止

apache-kafka - 断开连接后,Kafka消费者重新连接

java - 拆分数组列表并再次将它们合并回来

java - 将私钥从 PuTTY 格式转换为 OpenSSH 格式后,是否需要向服务器添加新的公钥?