我正在尝试根据时间戳重播 Kafka 主题数据。我能够从起始位置查找数据,但随后我需要检查每个分区中记录的结束时间戳。当我尝试将记录的时间戳与结束时间戳进行比较时,循环在第一个分区处中断。
for(TopicPartition partition : records.partitions() ) {
for (ConsumerRecord<Long, String> record : records) {
if (record.timestamp() >= endtimeStamp) {
consumer.seektoend(partition);
break;
}
}
}
最佳答案
您的循环将始终在第一个分区期间中断(如果至少有一条记录的 timestamp() >= endtimeStamp ),因为对于每个分区,您都会迭代所有记录。
for (ConsumerRecord<Long, String> record : records) {
if (record.timestamp() >= endtimeStamp) {
consumer.seektoend(partition);
break;
}
}
==> 在这里,您将执行所有记录,无论分区是什么。您将一次又一次地调用同一分区的seekToEnd()
雅尼克
关于java - 如何根据时间戳重播kafka主题中的消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57363956/