java - 为什么在Kafka中消费消息时offset没有更新

标签 java apache-kafka kafka-consumer-api

我正在实现 Kafka 消费者类来接收消息。我只想每次都只收到新消息。因此,我将 enable.auto.commit 设置为 true。然而,偏移量似乎根本没有改变。尽管主题、消费者组和分区始终相同。

这是我的消费者代码:

    consumerConfig.put("bootstrap.servers", bootstrap);
    consumerConfig.put("group.id", KafkaTestConstants.KAFKA_GROUP);
    consumerConfig.put("enable.auto.commit", "true");
    consumerConfig.put("auto.offset.reset", "earliest");
    consumerConfig.put("auto.commit.interval", 1000);
    consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
    consumerConfig.put("value.deserializer", StringDeserializer.class.getName());
    StringDeserializer deserializer = new StringDeserializer();
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);

    TopicPartition tp = new TopicPartition(KafkaTestConstants.KAFKA_TOPIC, 0);
    List<TopicPartition> tps = Arrays.asList(tp);
    kafkaConsumer.assign(tps);
    long offset = kafkaConsumer.position(tp);
    System.out.println("Offset of partition: " + offset);

    List<String> messages = new ArrayList<String>();
    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);

    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Message received: " + record.value());
        messages.add(record.value());
    }

    consumer.commitAsync();
    System.out.println("Offset commited.\n");
    consumer.close();

无论我运行多少次,它总是显示偏移量为0。因此,它总是从头开始接收所有消息。我错过了什么?

编辑:根据 Matthias 的回答,我决定手动提交偏移量。但是 commitSync() 会挂起。 commitAsync() 之类的作品。稍后我将解释“某种”。以下是代码的作用:

producer send 2 messages;
consumer initiates;
print out current position;
consumer.poll();
print received messages;
consumer.commitAsync();

这就是这段代码的行为方式。假设我有 100 条消息。现在生产者发送了 2 条新消息。在消费者轮询之前,它会显示当前偏移位置为102,而应该是100。因此,不会打印出新消息。这几乎就像生产者发送消息后更新了偏移量。

最佳答案

自动提交仅在您使用消费者组管理时才有效,为此,您需要“订阅”主题,但不能手动“分配”分区。

比较 KafkaConsumer 的 JavaDocs。这是一篇很长的文章,但需要了解如何正确使用消费者的微妙细节:https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

此外,如果启用自动提交,它将在 poll 内提交(即,调用 poll() 可能会将上一次调用返回的消息提交给poll()),而不是当您迭代返回消息时。这也意味着,您的提交将向前“跳转”,例如从提交的偏移量 0 到 100(如果您通过轮询为单个分区收到 100 条消息)。

关于java - 为什么在Kafka中消费消息时offset没有更新,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42701394/

相关文章:

java - Like 子句中的 MySQLSyntaxErrorException 错误

java/android NullPointerException 在字符串上抛出

apache-kafka - 高效的 MQTT 代理到 Kafka 代理桥接

java - 有没有一种有效的方法可以外部连接多个(超过 2 个)kafka 主题?

java - 如何从 Filter 类获取 liferay 远程用户

java - 将数组从最高到最低排序

concurrency - 在 Kafka Stream 中执行异步转换

apache-kafka - Spring Kafka 分区

asp.net-web-api - .Net Core 中的 Kafka 消费者

java - Kafka对同一主题的consumer数量有限制吗?