java - 使用偏移量在 Kafka 中回溯过去

标签 java apache-kafka

有没有办法使用我们传递的初始属性从特定偏移量启动消费者

我知道有 props.put("auto.offset.reset", "earliest") 但这让我开始了。

但是我想回去,我的情况如下

  1. 指定我想要开始的偏移量
  2. 指定我要开始的时间

我想使用初始属性作为首选选项来做到这一点。 如果不可能,则使用其他机制

附上我的简单消费者代码以供引用

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class SimpleConsumer {

    public static void main(String[] args) throws Exception {

        String topicName = "test3";
        Properties props = new Properties();

        String groupId = "single";

        // Kafka consumer configuration settings
        props.put("bootstrap.servers", "mymachine:9092");
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(topicName));

        System.out.println("Starting the _NON-BATCH_ consumer ::: Topic=" + topicName+" GroupId="+groupId);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("%s   (offset:%d, key:%s, partition = %s, topic = %s)", record.value(), record.offset(), record.key(), record.partition(), record.topic());
                System.out.println();
            }
        }
    }
}

最佳答案

对于场景1,您可以使用KafkaConsumer.seek(TopicPartition, offset)指定读取的偏移量。

对于场景2,Kafka 0.10.1.0提供了KafkaConsumer.offsetsForTimes方法,允许您通过时间戳查找给定分区的偏移量,然后调用seek()方法来检索您想要的消息。

关于java - 使用偏移量在 Kafka 中回溯过去,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41138539/

相关文章:

Java - 整数输入的累积和在 while 循环中不起作用

java - 无法遍历 JSON 数组

java - Android Sharedpreferences 传递变量值

java - 如何使用java修改一个kafka主题的消息并将其发送到另一个kafka主题?

ssl - Kafka安全实现问题SASL SSL和SCRAM

scala - 如何在 Scala 中编写 Kafka Producer

java - 从 Rest Web 服务返回 List<String>

java - 如何检查 BufferedReader 中是否有数据 10 秒

amazon-s3 - 从 Oracle 表流式传输到 Redshift

apache-kafka - 如何在 Kafka Sink Connector 中手动提交偏移量