我正在使用Spring Kafka 1.2.2.RELEASE。我有一个Kafka Listener作为使用者,它可以监听主题并以弹性方式索引文档。
我的自动提交偏移量属性设置为true//默认值。
我的印象是,如果监听器中存在异常(弹性下降),则不应提交偏移,并且应为下一次轮询处理相同的消息
但是这没有发生,消费者在下一次轮询时提交偏移量。阅读文章和文档后,我了解到这种情况是将auto commit设置为true到下一次轮询将提交所有偏移量
我的疑问是,消费者为什么要调用下一个民意调查,以及如何防止将auto commit设置为true的任何偏移量提交,还是需要将此属性设置为false并手动提交。
最佳答案
我更喜欢将其设置为false;容器为您管理偏移量更为可靠。
将容器的AckMode
设置为RECORD
(默认为BATCH
),容器将在监听器返回后为您提交偏移量。
还考虑升级到至少1.3.3(当前版本为2.1.4);由于有了KIP-62,1.3.x引入了更为简单的线程模型
编辑
使用自动提交,无论成功/失败,都将提交偏移量。除非ackOnError
为true(另一个不使用自动提交的原因),否则容器在失败后将不会提交。
但是,这仍然无济于事,因为经纪人不会再次发送相同的记录。为此,您必须在Consumer
上执行查找操作。
在2.0.1(当前版本为2.1.4)中,我们添加了SeekToCurrentErrorHandler
,它将导致在下一次轮询中重新发送失败和未处理的记录。 See the reference manual。
您还可以使用ConsumerAwareListener
自己执行查找(也在2.0中添加)。
对于较旧的版本(> = 1.1),您必须使用is quite a bit more complicated的ConsumerSeekAware
监听器。
另一种选择是添加retry,以便根据重试设置重新尝试传递。
关于如果发生故障,Spring Kafka自动提交偏移,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49404156/