我对处理因超过超时值而导致消费者死亡的问题有疑问。
我的示例配置:
session.timeout.ms = 10000 (10 seconds)
heartbeat.interval.ms = 2000 (2 seconds)
max.poll.interval.ms = 300000 (5 minutes)
我有 1 个主题、10 个分区、1 个消费者组、10 个消费者(1 个分区 = 1 个消费者)。
根据我的理解,在 Kafka 中消费消息非常简单,工作原理如下:
- 消费者对主题的 100 条记录进行投票
- 向经纪商发送
heartbeat
信号 - 正在处理记录
- 处理记录完成
- 完成处理(提交、不执行任何操作等)
- 循环重复#1-5
我的问题是,如果 heartbeats
之间的时间比之前配置的 session.timeout.ms
之间的时间长,会发生什么。我理解这部分,如果 session 超时,代理会初始化重新平衡,处理时间超过 session.timeout.ms
值的消费者被标记为 dead
,另一个消费者是该分区的 assigned/subscribed
。
好吧,但是然后呢...?
- 该长时间处理的消费者是否已从主题中删除/取消订阅,并且我的应用程序只剩下
9
个正在工作的消费者?如果所有消费者都超过超时并且都被认为已死亡怎么办,我是否会剩下一个正在运行的应用程序,因为没有消费者而什么也不做? - 长处理消费者在重新平衡发生后完成处理,代理是否再次初始化重新平衡并重新为消费者分配分区?据我了解,它继续循环运行 #1-5 并向代理发送
heartbeat
还会初始化将消费者添加到消费者组的过程,在给出dead
状态后将其从消费者组中删除,对吗? - 应用程序抛出某种异常,表明超出了
session.timeout.ms
并且处理突然停止? - 另外,
max.poll.interval.ms
属性又如何呢?如果我们甚至超过该时间段并且消费者 X 在max.poll.interval.ms
值之后完成处理怎么办? Consumer 已经超出了session.timeout.ms
值,被排除在 Consumer Group 之外,状态设置为dead
,这给我们配置 Kafka Consumer 带来了什么不同?
我们有一个提取数据进行处理的过程,这个提取由 50 多个 SQL 查询组成(大部分是 SELECT,很少是 UPDATES),它们通常运行得很快,但当然一切都取决于数据库负载和可能的锁等。处理时间可能比 session 超时时间还要长。我不想无限增加 session 超时,直到“我恰到好处”。该过程是幂等的,如果在 X 分钟内重复 X 次,我们不在乎。
最佳答案
请找出答案。
#1。是的。如果你的所有消费者实例由于session.timeout而被踢出消费者组,那么你将只剩下零个消费者实例,最终消费者应用程序会死掉,除非你重新启动。
#2。这取决于您如何编写与 poll() 和消费者记录迭代相关的消费者代码。如果您有适当的 while(true) 并在内部尝试和捕获,您的消费者将能够在处理该长时间运行的记录后重新加入消费者组。
#3。您最终会遇到提交失败的异常:
失败:提交无法完成,因为组已重新平衡并将分区分配给另一个成员。这意味着后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长,这通常意味着 poll 循环花费了太多时间处理消息。您可以通过增加 max.poll.interval.ms 或使用 max.poll.records 减少 poll() 中返回的批处理的最大大小来解决此问题。
这又取决于您的代码,自动加入消费者组。
#4。答案就在这里
session.timeout.ms
消费者在仍处于状态时可以与经纪人失去联系的时间 视为存活默认为 3 秒。如果超过 session.timeout.ms 如果消费者没有向组协调器发送心跳,则认为 死亡并且组协调器将触发消费者组的重新平衡 将失效消费者的分区分配给组中的其他消费者。这 属性与heartbeat.interval.ms密切相关。 heartbeat.interval.ms con- 控制 KafkaConsumer poll() 方法向 KafkaConsumer poll() 方法发送心跳的频率 组协调员,而 session.timeout.ms 控制消费者可以多长时间 不发送心跳就走。因此,这两个属性通常是可变的 结合在一起 — heatbeat.interval.ms 必须低于 session.timeout.ms,并且 通常设置为超时值的三分之一。所以如果 session.timeout.ms 是 3 秒- onds,heartbeat.interval.ms 应为 1 秒。设置session.timeout.ms 低于默认值将允许消费者组检测故障并从故障中恢复 更快,但也可能因消费者采取措施而导致不必要的重新平衡 完成轮询循环或垃圾收集的时间更长。设置session.timeout.ms 更高会减少意外重新平衡的机会,但也意味着需要 检测真正故障的时间更长。
关于apache-kafka - Kafka消费者死亡处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72615626/