我正在使用 Kafka 监听器来接收消息。我已启用手动确认。我已使用 Acknowledgment.acknowledge() 方法在成功时提交偏移量。由于某些API调用失败而引发异常的情况,我没有提交。
我有点困惑,因为除非重新启动应用程序,否则未提交的消息不会再次被消耗。我是卡夫卡新手,所以我确信我错过了一些非常基本的东西,但搜索对此是徒劳的。 有人可以解释一下或指出我可以知道这里出了什么问题的正确来源吗?
@KafkaListener(topics = "temp_topic", groupId = "temp-consumer", containerFactory = "tempListenerContainerFactory")
public void receive(ConsumerRecord<?, ?> consumerRecord,
Acknowledgment acknowledgment) {
try {
log.info("Consuming message : {} ", consumerRecord.value().toString());
String message = consumerRecord.value().toString();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Payload payload = objectMapper.readValue(message,Payload.class);
tempService.processEvent(payload);
log.info("Message: {} successfully consumed",message);
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("Error while listening to kafka event: ", e);
}
}
提前谢谢您。
最佳答案
确认不会改变消费者实际阅读的时间点。
它仅向某些存储(带有偏移量的zookeeper或kafka主题)提交偏移量,而不会对消费者跟踪的偏移量进行任何更改 - 例如,您可能不会提交当前偏移量并通过对下一个偏移量进行确认来覆盖它。
您可以尝试使用一些方法来处理失败的消息:
- 您可以为特定类型的失败(例如超时、反序列化错误)做好准备 - 在监听器内捕获它们并采取正确的操作(例如等待/重试或使应用程序失败 - 取决于问题的性质)。
- 您可以跳过失败的消息 - 在监听器内捕获错误,将它们放入失败消息的存储中(请参阅死信队列),然后对它们进行确认。
关于java - 未提交的消息不会再次被消耗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50476204/