apache-kafka - Kafka 消费者不是从最新消息开始

标签 apache-kafka kafka-consumer-api

我想要一个从主题中的最新消息开始的 Kafka 消费者。

这是Java代码:

private static Properties properties = new Properties();
private static KafkaConsumer<String, String> consumer;
static
{
    properties.setProperty("bootstrap.servers","localhost");
    properties.setProperty("enable.auto.commit", "true");
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("group.id", "test");
    properties.setProperty("auto.offset.reset", "latest");
    consumer = new KafkaConsumer<>(properties);

    consumer.subscribe(Collections.singletonList("mytopic"));
}

@Override
public StreamHandler call() throws Exception
{
    while (true) 
    {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(200);
        Iterable<ConsumerRecord<String, String>> records = consumerRecords.records("mytopic");
        for(ConsumerRecord<String, String> rec : records)
        {
            System.out.println(rec.value());
        }
    }
}

虽然 auto.offset.reset 的值是 最新 ,但消费者开始形成属于 2 天前的消息,然后它会 catch 最新的消息。

我错过了什么?

最佳答案

您之前是否使用相同的 group.id 运行过相同的代码? ? auto.offset.reset参数仅在没有为您的消费者存储的现有偏移量时才使用。所以如果你之前运行过这个例子,比如说两天前,然后你再次运行它,它将从最后一个消耗的位置开始。

使用 seekToEnd()如果您想手动转到主题的末尾。

https://stackoverflow.com/a/32392174/1392894对此进行更彻底的讨论。

关于apache-kafka - Kafka 消费者不是从最新消息开始,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43945927/

相关文章:

scala - 值尾不是(String,String)的成员

apache-kafka - 使用 Kafka 进行一对一和群组消息传递

spring-boot - 无法在 Spring-Boot 应用程序中使用 Spring-integration-kafka 禁用 Kafka 消息的手动提交

java - Apache 卡夫卡 : Processor generates multiple output per input

java - Kafka 一个消费者一个partition

java - 从 0.7 升级到 0.8.1.1 后生成嵌入式 kafka 队列时出错

python - 构建天气预报实时数据管道

apache-kafka - 如何最小化kafka消息传递框架中涉及的延迟?

docker - 无法使用 Docker 容器内的 Kafka 代理运行控制台使用者

apache-kafka - 如何从read_committed Kafka Consumer获取上次提交的偏移量