如果我不想使用自动提交模式 - sping 提供了另一种方法来做到这一点。
spring-kafkfa/#committing-offsets给我们以下关于提交偏移量的信息:
RECORD
- commit the offset when the listener returns after processing the record.
BATCH
- commit the offset when all the records returned by the poll() have been processed.
TIME
- commit the offset when all the records returned by the poll() have been processed as long as the ackTime since the last commit has been exceeded.
COUNT
- commit the offset when all the records returned by the poll() have been processed as long as ackCount records have been received since the last commit.
COUNT_TIME
- similar to TIME and COUNT but the commit is performed if either condition is true.
MANUAL
- the message listener is responsible to acknowledge() the Acknowledgment; after which, the same semantics as BATCH are applied.
MANUAL_IMMEDIATE
- commit the offset immediately when the Acknowledgment.acknowledge() method is called by the listener.
我有几个问题:
TIME
据我了解,spring 框架中的某个地方存在循环while(true){
data = consumer.poll();
data.foreach(record->listener.listen(record))
}
民意调查多久发生一次?
时间是提交抵消的唯一标准吗?
假设 poll 返回了 100 条记录,当 ackTime 过期时 - 只处理了 60 条记录?
我没有注意到
MANUAL_IMMEDIATE
之间的区别和 MANUAL
请为我澄清这些问题。
附言
据我了解,Garry Russel 的 foreach 答案如下:
while(true){
data = consumer.poll();
data.foreach(record->new Thread(()->listener.listen(record)).start());
}
最佳答案
这取决于版本;最近的 1.3 版本有一个更简单的线程模型,由 KIP-62 促进.
在那个版本中,监听器在调用者线程上被调用;直到所有当前记录都被消耗掉后才会进行下一次轮询。除了 RECORD
(和 MANUAL*
),提交的决定是在所有记录发送到监听器后确定的。MANUAL_IMMEDIATE
就是这个意思;当用户确认时立即提交偏移量;与 MANUAL
,在所有记录发送到监听器后提交手动偏移量。
较早的版本稍微复杂一些;可以获取一个或两个额外的批次,并且在每次轮询之前执行确认,因此可能会在第一批中的所有记录发送到监听器之前提交偏移量。
编辑
在下面回答您的评论...
是的;线程在 1.3 中发生了变化。在此之前,我们必须不断轮询消费者以避免代理重新平衡分区。 <= 1.2,ConsumerRecords
通过深度为 1 的队列将监听器线程移交给监听器线程。轮询器不断轮询直到无法容纳更多 ConsumerRecords
在队列中;那个时候,它pause
s 消费者(以便随后的民意调查不返回记录),但我们仍然必须继续调用 poll
以避免重新平衡。当监听器 catch 时,消费者是resume
d 和消息再次开始流动。
因此,最坏的情况是容器包含 3 组记录 - 当前正在由监听器处理的一组记录,队列中的一组记录以及我们无法放入队列中的一组记录。任何未完成的偏移提交(手动或其他方式)都在每次轮询之前执行。
Will poll thread await processor thread?
不;我们不能这样做,因为这会导致重新平衡——就像我们在消费者线程上调用监听器一样。
KIP-62其实是在0.10.1.0客户端修复的,但是我们直到1.3才改变线程;由于 KIP-62,这是一个重大的简化,我建议使用该版本。
关于java - Spring-kafka 偏移提交方式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46767598/