java - 批量插入成功后更新 Kafka 提交偏移量

标签 java spring apache-kafka spring-kafka

我有一个 spring-kafka 消费者,它读取记录并将其移交给缓存。计划任务会定期清除缓存中的记录。我想仅在批处理成功保存到数据库后更新 COMMIT OFFSET。我尝试将确认对象传递给缓存服务以调用确认方法,如下所示。

public class KafkaConsumer {
    @KafkaListener( topicPattern = "${kafka.topicpattern}", containerFactory = "kafkaListenerContainerFactory" )
    public void receive( ConsumerRecord<String, String> record, Acknowledgment acknowledgment ) {
        cacheService.add( record.value(), acknowledgment );
    }
}

public class CacheService {
    // concurrency handling has been left out in favor of readability
    public void add( String record, Acknowledgment acknowledgment ) {
        this.records.add(record);
        this.lastAcknowledgment = acknowledgment;
    }

    public void saveBatch() { //called by scheduled task
        if( records.size() == BATCH_SIZE ) {
            // perform batch insert into database
            this.lastAcknowledgment.acknowledge();
            this.records.clear();
        }
    }
}

AckMode 已设置如下:

factory.getContainerProperties().setAckMode( AbstractMessageListenerContainer.AckMode.MANUAL );

自动提交是错误的:

config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );

即使调用了acknowledge方法,提交偏移量也不会更新。持久化记录后更新提交偏移量的最佳方法是什么?

我正在使用 spring-kafka 2.1.7.RELEASE。


编辑:@GaryRussell confirmed之后由于外部线程所做的确认由消费者线程在下一次轮询期间执行,我重新检查了我的代码,发现最后一个确认对象的设置方式存在错误。 修复此问题后,提交偏移量将按预期更新。所以这个问题已经解决了。但是,我无法将此问题标记为已回答。

最佳答案

问题来了,消费者线程负责提交偏移量。在轮询时,消费者线程将提交之前的批处理偏移量。

由于在您的情况下 AUTO_COMMIT 为 false,并且 lastAcknowledgment.acknowledge() 未确认偏移量未提交。

只有一种方法可以做到这一点,一旦获得轮询记录,就将 Schedule 任务作为 Async 并保持消费者线程并在异步完成后提交偏移量任务,查看此答案以供引用answer

注意如果您持有消费者线程超过 5 分钟,将会发生重新平衡 here

he new Java Consumer now supports heartbeating from a background thread. There is a new configuration max.poll.interval.ms which controls the maximum time between poll invocations before the consumer will proactively leave the group (5 minutes by default). The value of the configuration request.timeout.ms must always be larger than max.poll.interval.ms because this is the maximum time that a JoinGroup request can block on the server while the consumer is rebalancing, so we have changed its default value to just above 5 minutes. Finally, the default value of session.timeout.ms has been adjusted down to 10 seconds, and the default value of max.poll.records has been changed to 500.

特别说明来自 spring kafka >2.1.5

对外部线程所做的确认将由消费者线程在下一次轮询之前执行,感谢 @Gary Russell 提供此信息

关于java - 批量插入成功后更新 Kafka 提交偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54383839/

相关文章:

java - 使用 Gradle 编写环境特定的 web.xml、log4j.xml

java - 如何从嵌套的json数据获取数组值(java android)

apache-kafka - 如何在apache kafka中处理异常和消息重新处理

scala - 使用 Spark Structured Streaming 时限制 kafka 批量大小

java - org.springframework.dao.EmptyResultDataAccessException : No class com. jea.user.ID为x的用户实体存在

java - System.setProperty 无法进行第二次调用

RESTeasy 服务中 Spring Autowiring 失败

java - 如何在 Spring Web 应用程序中添加乌尔都语语言支持

Spring//JSP//表单处理 - 在字符串上使用@ModelAttribute

java - 如何在 Kafka 中将确认设置为 false