卡夫卡版本:0.9.0.1
如果n = 20
,
我必须获取某个主题的最后 20 条消息。
我试过
kafkaConsumer.seekToBeginning();
但它会检索所有消息。我只需要获取最后 20 条消息。
这个主题可能有几十万条记录
public List<JSONObject> consumeMessages(String kafkaTopicName) {
KafkaConsumer<String, String> kafkaConsumer = null;
boolean flag = true;
List<JSONObject> messagesFromKafka = new ArrayList<>();
int recordCount = 0;
int i = 0;
int maxMessagesToReturn = 20;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "project.group.id");
props.put("max.partition.fetch.bytes", "1048576000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
TopicPartition topicPartition = new TopicPartition(kafkaTopicName, 0);
LOGGER.info("Subscribed to topic " + kafkaConsumer.listTopics());
while (flag) {
// will consume all the messages and store in records
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
kafkaConsumer.seekToBeginning(topicPartition);
// getting total records count
recordCount = records.count();
LOGGER.info("recordCount " + recordCount);
for (ConsumerRecord<String, String> record : records) {
if(record.value() != null) {
if (i >= recordCount - maxMessagesToReturn) {
// adding last 20 messages to messagesFromKafka
LOGGER.info("kafkaMessage "+record.value());
messagesFromKafka.add(new JSONObject(record.value()));
}
i++;
}
}
if (recordCount > 0) {
flag = false;
}
}
kafkaConsumer.close();
return messagesFromKafka;
}
最佳答案
您可以使用 kafkaConsumer.seekToEnd(Collection<TopicPartition> partitions)
寻找给定分区的最后一个偏移量。根据文档:
"Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the final offset in all partitions only when
poll(Duration)
orposition(TopicPartition)
are called. If no partitions are provided, seek to the final offset for all of the currently assigned partitions."
然后您可以使用 position(TopicPartition partition)
检索特定分区的位置.
然后你可以从中减少 20,并使用 kafkaConsumer.seek(TopicPartition partition, long offset)
获取最近的 20 条消息。
简单地说,
kafkaConsumer.seekToEnd(partitionList);
long endPosition = kafkaConsumer.position(topicPartiton);
long recentMessagesStartPosition = endPosition - maxMessagesToReturn;
kafkaConsumer.seek(topicPartition, recentMessagesStartPosition);
现在您可以使用 poll()
检索最近的 20 条消息
这是一个简单的逻辑,但是如果你有多个分区,你也必须考虑这些情况。我没有尝试这个,但希望你能理解这个概念。
关于java - 从特定主题中检索 Kafka 消费者的最后 n 条消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55744667/