我有一个 spring-kafka 消费者,它读取记录并将其移交给缓存。计划任务会定期清除缓存中的记录。我想仅在批处理成功保存到数据库后更新 COMMIT OFFSET。我尝试将确认对象传递给缓存服务以调用确认方法,如下所示。
public class KafkaConsumer {
@KafkaListener( topicPattern = "${kafka.topicpattern}", containerFactory = "kafkaListenerContainerFactory" )
public void receive( ConsumerRecord<String, String> record, Acknowledgment acknowledgment ) {
cacheService.add( record.value(), acknowledgment );
}
}
public class CacheService {
// concurrency handling has been left out in favor of readability
public void add( String record, Acknowledgment acknowledgment ) {
this.records.add(record);
this.lastAcknowledgment = acknowledgment;
}
public void saveBatch() { //called by scheduled task
if( records.size() == BATCH_SIZE ) {
// perform batch insert into database
this.lastAcknowledgment.acknowledge();
this.records.clear();
}
}
}
AckMode 已设置如下:
factory.getContainerProperties().setAckMode( AbstractMessageListenerContainer.AckMode.MANUAL );
自动提交是错误的:
config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );
即使调用了acknowledge方法,提交偏移量也不会更新。持久化记录后更新提交偏移量的最佳方法是什么?
我正在使用 spring-kafka 2.1.7.RELEASE。
编辑:@GaryRussell confirmed之后由于外部线程所做的确认由消费者线程在下一次轮询期间执行,我重新检查了我的代码,发现最后一个确认对象的设置方式存在错误。 修复此问题后,提交偏移量将按预期更新。所以这个问题已经解决了。但是,我无法将此问题标记为已回答。
最佳答案
问题来了,消费者线程负责提交偏移量。在轮询时,消费者线程将提交之前的批处理偏移量。
由于在您的情况下 AUTO_COMMIT
为 false,并且 lastAcknowledgment.acknowledge()
未确认偏移量未提交。
只有一种方法可以做到这一点,一旦获得轮询记录,就将 Schedule
任务作为 Async
并保持消费者线程并在异步完成后提交偏移量任务,查看此答案以供引用answer
注意如果您持有消费者线程超过 5 分钟,将会发生重新平衡 here
he new Java Consumer now supports heartbeating from a background thread. There is a new configuration max.poll.interval.ms which controls the maximum time between poll invocations before the consumer will proactively leave the group (5 minutes by default). The value of the configuration request.timeout.ms must always be larger than
max.poll.interval.ms
because this is the maximum time that a JoinGroup request can block on the server while the consumer is rebalancing, so we have changed its default value to just above 5 minutes. Finally, the default value ofsession.timeout.ms
has been adjusted down to 10 seconds, and the default value ofmax.poll.records
has been changed to 500.
特别说明来自 spring kafka >2.1.5
对外部线程所做的确认将由消费者线程在下一次轮询之前执行,感谢 @Gary Russell 提供此信息
关于java - 批量插入成功后更新 Kafka 提交偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54383839/