有没有办法使用我们传递的初始属性从特定偏移量启动消费者
我知道有 props.put("auto.offset.reset", "earliest") 但这让我开始了。
但是我想回去,我的情况如下
- 指定我想要开始的偏移量
- 指定我要开始的时间
我想使用初始属性作为首选选项来做到这一点。 如果不可能,则使用其他机制
附上我的简单消费者代码以供引用
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
String topicName = "test3";
Properties props = new Properties();
String groupId = "single";
// Kafka consumer configuration settings
props.put("bootstrap.servers", "mymachine:9092");
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));
System.out.println("Starting the _NON-BATCH_ consumer ::: Topic=" + topicName+" GroupId="+groupId);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("%s (offset:%d, key:%s, partition = %s, topic = %s)", record.value(), record.offset(), record.key(), record.partition(), record.topic());
System.out.println();
}
}
}
}
最佳答案
对于场景1,您可以使用KafkaConsumer.seek(TopicPartition, offset)指定读取的偏移量。
对于场景2,Kafka 0.10.1.0提供了KafkaConsumer.offsetsForTimes方法,允许您通过时间戳查找给定分区的偏移量,然后调用seek()方法来检索您想要的消息。
关于java - 使用偏移量在 Kafka 中回溯过去,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41138539/