java - 从特定主题中检索 Kafka 消费者的最后 n 条消息

标签 java apache-kafka kafka-consumer-api

卡夫卡版本:0.9.0.1

如果n = 20, 我必须获取某个主题的最后 20 条消息。

我试过

kafkaConsumer.seekToBeginning();

但它会检索所有消息。我只需要获取最后 20 条消息。

这个主题可能有几十万条记录

public List<JSONObject> consumeMessages(String kafkaTopicName) {
  KafkaConsumer<String, String> kafkaConsumer = null;
  boolean flag = true;
  List<JSONObject> messagesFromKafka = new ArrayList<>();
  int recordCount = 0;
  int i = 0;
  int maxMessagesToReturn = 20;

  Properties props = new Properties();         
  props.put("bootstrap.servers", "localhost:9092");
  props.put("group.id", "project.group.id");
  props.put("max.partition.fetch.bytes", "1048576000");
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  kafkaConsumer = new KafkaConsumer<>(props);

  kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
  TopicPartition topicPartition = new TopicPartition(kafkaTopicName, 0);
  LOGGER.info("Subscribed to topic " + kafkaConsumer.listTopics());
  while (flag) {
    // will consume all the messages and store in records
    ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
    kafkaConsumer.seekToBeginning(topicPartition);

    // getting total records count
    recordCount = records.count();
    LOGGER.info("recordCount " + recordCount);
    for (ConsumerRecord<String, String> record : records) {
      if(record.value() != null) {
        if (i >= recordCount - maxMessagesToReturn) {
          // adding last 20 messages to messagesFromKafka
          LOGGER.info("kafkaMessage "+record.value());
          messagesFromKafka.add(new JSONObject(record.value()));
        }
        i++;
      }
    }
    if (recordCount > 0) {
      flag = false;
    }
  }
  kafkaConsumer.close();
  return messagesFromKafka;
}

最佳答案

您可以使用 kafkaConsumer.seekToEnd(Collection<TopicPartition> partitions)寻找给定分区的最后一个偏移量。根据文档:

"Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the final offset in all partitions only when poll(Duration) or position(TopicPartition) are called. If no partitions are provided, seek to the final offset for all of the currently assigned partitions."

然后您可以使用 position(TopicPartition partition) 检索特定分区的位置.

然后你可以从中减少 20,并使用 kafkaConsumer.seek(TopicPartition partition, long offset)获取最近的 20 条消息。

简单地说,

kafkaConsumer.seekToEnd(partitionList);
long endPosition = kafkaConsumer.position(topicPartiton);
long recentMessagesStartPosition = endPosition - maxMessagesToReturn;
kafkaConsumer.seek(topicPartition, recentMessagesStartPosition);

现在您可以使用 poll() 检索最近的 20 条消息

这是一个简单的逻辑,但是如果你有多个分区,你也必须考虑这些情况。我没有尝试这个,但希望你能理解这个概念。

关于java - 从特定主题中检索 Kafka 消费者的最后 n 条消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55744667/

相关文章:

java - Apache kafka - 手动确认(AbstractMessageListenerContainer.AckMode.MANUAL)不起作用并且在库升级时重播事件

apache-kafka - 如何在kafka中定义多个序列化器?

java - 多次调用消费者时如何重置Kafka消费者偏移量

apache-kafka - java.lang.ClassNotFoundException : kafka. api.OffsetRequest

java - 以编程方式配置 Log4j 记录器

java - 使用OpenCV-3.1.0时,如何获取OpenCV-2x提供的 "Highgui.imencode()"的功能?

stream - Apache Kafka Streams 将 KTables 物化为主题似乎很慢

apache-kafka - Spring kafka消费者@retryabletopic无限重试

java - Java 类名中的有效字符

java - spring data mongo - 没有定义名为 'mongoTemplate' 的 bean