java - Spring-kafka 偏移提交方式

标签 java spring kafka-consumer-api spring-kafka

如果我不想使用自动提交模式 - sping 提供了另一种方法来做到这一点。

spring-kafkfa/#committing-offsets给我们以下关于提交偏移量的信息:

RECORD - commit the offset when the listener returns after processing the record.
BATCH - commit the offset when all the records returned by the poll() have been processed.
TIME - commit the offset when all the records returned by the poll() have been processed as long as the ackTime since the last commit has been exceeded.
COUNT - commit the offset when all the records returned by the poll() have been processed as long as ackCount records have been received since the last commit.
COUNT_TIME - similar to TIME and COUNT but the commit is performed if either condition is true.
MANUAL - the message listener is responsible to acknowledge() the Acknowledgment; after which, the same semantics as BATCH are applied.
MANUAL_IMMEDIATE - commit the offset immediately when the Acknowledgment.acknowledge() method is called by the listener.



我有几个问题:
TIME据我了解,spring 框架中的某个地方存在循环
while(true){
   data = consumer.poll();
   data.foreach(record->listener.listen(record))
}

民意调查多久发生一次?

时间是提交抵消的唯一标准吗?
假设 poll 返回了 100 条记录,当 ackTime 过期时 - 只处理了 60 条记录?

我没有注意到 MANUAL_IMMEDIATE 之间的区别和 MANUAL
请为我澄清这些问题。

附言

据我了解,Garry Russel 的 foreach 答案如下:
while(true){
   data = consumer.poll();
   data.foreach(record->new Thread(()->listener.listen(record)).start());
}

最佳答案

这取决于版本;最近的 1.3 版本有一个更简单的线程模型,由 KIP-62 促进.

在那个版本中,监听器在调用者线程上被调用;直到所有当前记录都被消耗掉后才会进行下一次轮询。除了 RECORD (和 MANUAL* ),提交的决定是在所有记录发送到监听器后确定的。
MANUAL_IMMEDIATE就是这个意思;当用户确认时立即提交偏移量;与 MANUAL ,在所有记录发送到监听器后提交手动偏移量。

较早的版本稍微复杂一些;可以获取一个或两个额外的批次,并且在每次轮询之前执行确认,因此可能会在第一批中的所有记录发送到监听器之前提交偏移量。

编辑

在下面回答您的评论...

是的;线程在 1.3 中发生了变化。在此之前,我们必须不断轮询消费者以避免代理重新平衡分区。 <= 1.2,ConsumerRecords通过深度为 1 的队列将监听器线程移交给监听器线程。轮询器不断轮询直到无法容纳更多 ConsumerRecords在队列中;那个时候,它pause s 消费者(以便随后的民意调查不返回记录),但我们仍然必须继续调用 poll以避免重新平衡。当监听器 catch 时,消费者是resume d 和消息再次开始流动。

因此,最坏的情况是容器包含 3 组记录 - 当前正在由监听器处理的一组记录,队列中的一组记录以及我们无法放入队列中的一组记录。任何未完成的偏移提交(手动或其他方式)都在每次轮询之前执行。

Will poll thread await processor thread?



不;我们不能这样做,因为这会导致重新平衡——就像我们在消费者线程上调用监听器一样。

KIP-62其实是在0.10.1.0客户端修复的,但是我们直到1.3才改变线程;由于 KIP-62,这是一个重大的简化,我建议使用该版本。

关于java - Spring-kafka 偏移提交方式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46767598/

相关文章:

java - 如何从其他类方法访问 ConcurrentHashMap 元素

java - 寻找一种通过单击按钮来更改 JTextField 中文本的简单方法

java - Spring无法通过属性标签注入(inject)值

java - 如何在 host monster 提供的服务器上运行 java web 应用程序

apache-kafka - 如何在kafka中创建一个新的消费者组

java - 主要使用 javascript 和 JMeter 进行负载测试 Tapestry 应用程序

java - 为什么我收到错误 "ERROR Could not find value for key log4j.appender.CONSOLE"?

java - RESTEasy 不会将我的 Spring bean 映射到自定义 Spring ContextLoader

java - Apache Kafka 和 Avro : org. apache.avro.generic.GenericData$Record 无法转换为 com.harmeetsingh13.java.Customer

apache-kafka - 再次重新处理/读取 Kafka 记录/消息 - 消费者组偏移重置的目的是什么?