java - 未提交的消息不会再次被消耗

标签 java apache-kafka

我正在使用 Kafka 监听器来接收消息。我已启用手动确认。我已使用 Acknowledgment.acknowledge() 方法在成功时提交偏移量。由于某些API调用失败而引发异常的情况,我没有提交。

我有点困惑,因为除非重新启动应用程序,否则未提交的消息不会再次被消耗。我是卡夫卡新手,所以我确信我错过了一些非常基本的东西,但搜索对此是徒劳的。 有人可以解释一下或指出我可以知道这里出了什么问题的正确来源吗?

 @KafkaListener(topics = "temp_topic", groupId = "temp-consumer", containerFactory = "tempListenerContainerFactory")
public void receive(ConsumerRecord<?, ?> consumerRecord,
                                   Acknowledgment acknowledgment) {
    try {
        log.info("Consuming message : {} ", consumerRecord.value().toString());
        String message = consumerRecord.value().toString();
         objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
         Payload payload = objectMapper.readValue(message,Payload.class);
         tempService.processEvent(payload);
         log.info("Message: {} successfully consumed",message);
         acknowledgment.acknowledge();
    } catch (Exception e) {
        log.error("Error while listening to kafka event: ", e);
    }
}

提前谢谢您。

最佳答案

确认不会改变消费者实际阅读的时间点。
它仅向某些存储(带有偏移量的zookeeper或kafka主题)提交偏移量,而不会对消费者跟踪的偏移量进行任何更改 - 例如,您可能不会提交当前偏移量并通过对下一个偏移量进行确认来覆盖它。

您可以尝试使用一些方法来处理失败的消息:

  1. 您可以为特定类型的失败(例如超时、反序列化错误)做好准备 - 在监听器内捕获它们并采取正确的操作(例如等待/重试或使应用程序失败 - 取决于问题的性质)。
  2. 您可以跳过失败的消息 - 在监听器内捕获错误,将它们放入失败消息的存储中(请参阅死信队列),然后对它们进行确认。

关于java - 未提交的消息不会再次被消耗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50476204/

相关文章:

json - druid kafka 索引服务设置

kotlin - 注册 Avro 架构 : "string" RestClientException: Schema being registered is incompatible with an earlier schema; 时出错

apache-kafka - Apache Kafka 偏移量是如何生成的?

java - 如何使用 Apache Beam (KafkaIO) 反序列化 avro 数据

java - Apereo CAS 5.3.x : how to get logged user, 来自 CAS 服务器,在 java cas 客户端登录后

Java BigInteger,切断最后一位数字

java - 检索添加了用户输入的网站

java - 如何在 ApplicationRunner 中正确访问安全的 Spring Data REST 存储库?

java - 如何在 SWT 中重用图像

elasticsearch - apache flink、elasticsearch 和 kafka 集成的版本