如果发生故障,Spring Kafka自动提交偏移

标签 spring apache-kafka spring-kafka

我正在使用Spring Kafka 1.2.2.RELEASE。我有一个Kafka Listener作为使用者,它可以监听主题并以弹性方式索引文档。
我的自动提交偏移量属性设置为true//默认值。

我的印象是,如果监听器中存在异常(弹性下降),则不应提交偏移,并且应为下一次轮询处理相同的消息

但是这没有发生,消费者在下一次轮询时提交偏移量。阅读文章和文档后,我了解到这种情况是将auto commit设置为true到下一次轮询将提交所有偏移量

我的疑问是,消费者为什么要调用下一个民意调查,以及如何防止将auto commit设置为true的任何偏移量提交,还是需要将此属性设置为false并手动提交。

最佳答案

我更喜欢将其设置为false;容器为您管理偏移量更为可靠。

将容器的AckMode设置为RECORD(默认为BATCH),容器将在监听器返回后为您提交偏移量。

还考虑升级到至少1.3.3(当前版本为2.1.4);由于有了KIP-62,1.3.x引入了更为简单的线程模型

编辑

使用自动提交,无论成功/失败,都将提交偏移量。除非ackOnError为true(另一个不使用自动提交的原因),否则容器在失败后将不会提交。

但是,这仍然无济于事,因为经纪人不会再次发送相同的记录。为此,您必须在Consumer上执行查找操作。

在2.0.1(当前版本为2.1.4)中,我们添加了SeekToCurrentErrorHandler,它将导致在下一次轮询中重新发送失败和未处理的记录。 See the reference manual

您还可以使用ConsumerAwareListener自己执行查找(也在2.0中添加)。

对于较旧的版本(> = 1.1),您必须使用is quite a bit more complicatedConsumerSeekAware监听器。

另一种选择是添加retry,以便根据重试设置重新尝试传递。

关于如果发生故障,Spring Kafka自动提交偏移,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49404156/

相关文章:

java - 如何从 Spring Security 扩展用户类并使用 JPA 创建表

java - MVC 应用程序中的服务是否应返回实体或实体列表

java - 我如何在java中对这个方法进行单元测试?

python - KafkaTimeoutError : Failed to update metadata after 60. 0 秒

javascript - 测试框架意外退出 - Karate 测试

java - Spring 批处理-Kafka : KafkaItemReader reads the data ALWAYS from beginning

java - 如何在 Spring Boot 应用程序中设置 Tomcat 的 "reloadable"标志?

Spring Kafka Producer 不发送到 Kafka 1.0.0(Magic v1 不支持记录头)

java - Spring Kafka Embedded - 测试之间已经存在主题

spring - 使用 Spring Kafka 添加自定义 header