apache-kafka - Kafka消费者死亡处理

标签 apache-kafka kafka-consumer-api

我对处理因超过超时值而导致消费者死亡的问题有疑问。

我的示例配置:

session.timeout.ms = 10000 (10 seconds)
heartbeat.interval.ms = 2000 (2 seconds)
max.poll.interval.ms = 300000 (5 minutes)

我有 1 个主题、10 个分区、1 个消费者组、10 个消费者(1 个分区 = 1 个消费者)。

根据我的理解,在 Kafka 中消费消息非常简单,工作原理如下:

  1. 消费者对主题的 100 条记录进行投票
  2. 向经纪商发送 heartbeat 信号
  3. 正在处理记录
  4. 处理记录完成
  5. 完成处理(提交、不执行任何操作等)
  6. 循环重复#1-5

我的问题是,如果 heartbeats 之间的时间比之前配置的 session.timeout.ms 之间的时间长,会发生什么。我理解这部分,如果 session 超时,代理会初始化重新平衡,处理时间超过 session.timeout.ms 值的消费者被标记为 dead,另一个消费者是该分区的 assigned/subscribed

好吧,但是然后呢...?

  1. 该长时间处理的消费者是否已从主题中删除/取消订阅,并且我的应用程序只剩下 9 个正在工作的消费者?如果所有消费者都超过超时并且都被认为已死亡怎么办,我是否会剩下一个正在运行的应用程序,因为没有消费者而什么也不做?
  2. 长处理消​​费者在重新平衡发生后完成处理,代理是否再次初始化重新平衡并重新为消费者分配分区?据我了解,它继续循环运行 #1-5 并向代理发送 heartbeat 还会初始化将消费者添加到消费者组的过程,在给出 dead 状态后将其从消费者组中删除,对吗?
  3. 应用程序抛出某种异常,表明超出了 session.timeout.ms 并且处理突然停止?
  4. 另外,max.poll.interval.ms 属性又如何呢?如果我们甚至超过该时间段并且消费者 X 在 max.poll.interval.ms 值之后完成处理怎么办? Consumer 已经超出了 session.timeout.ms 值,被排除在 Consumer Group 之外,状态设置为 dead ,这给我们配置 Kafka Consumer 带来了什么不同?

我们有一个提取数据进行处理的过程,这个提取由 50 多个 SQL 查询组成(大部分是 SELECT,很少是 UPDATES),它们通常运行得很快,但当然一切都取决于数据库负载和可能的锁等。处理时间可能比 session 超时时间还要长。我不想无限增加 session 超时,直到“我恰到好处”。该过程是幂等的,如果在 X 分钟内重复 X 次,我们不在乎。

最佳答案

请找出答案。

#1。是的。如果你的所有消费者实例由于session.timeout而被踢出消费者组,那么你将只剩下零个消费者实例,最终消费者应用程序会死掉,除非你重新启动。

#2。这取决于您如何编写与 poll() 和消费者记录迭代相关的消费者代码。如果您有适当的 while(true) 并在内部尝试和捕获,您的消费者将能够在处理该长时间运行的记录后重新加入消费者组。

#3。您最终会遇到提交失败的异常:

失败:提交无法完成,因为组已重新平衡并将分区分配给另一个成员。这意味着后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长,这通常意味着 poll 循环花费了太多时间处理消息。您可以通过增加 max.poll.interval.ms 或使用 max.poll.records 减少 poll() 中返回的批处理的最大大小来解决此问题。

这又取决于您的代码,自动加入消费者组。

#4。答案就在这里

session.timeout.ms

消费者在仍处于状态时可以与经纪人失去联系的时间 视为存活默认为 3 秒。如果超过 session.timeout.ms 如果消费者没有向组协调器发送心跳,则认为 死亡并且组协调器将触发消费者组的重新平衡 将失效消费者的分区分配给组中的其他消费者。这 属性与heartbeat.interval.ms密切相关。 heartbeat.interval.ms con- 控制 KafkaConsumer poll() 方法向 KafkaConsumer poll() 方法发送心跳的频率 组协调员,而 session.timeout.ms 控制消费者可以多长时间 不发送心跳就走。因此,这两个属性通常是可变的 结合在一起 — heatbeat.interval.ms 必须低于 session.timeout.ms,并且 通常设置为超时值的三分之一。所以如果 session.timeout.ms 是 3 秒- onds,heartbeat.interval.ms 应为 1 秒。设置session.timeout.ms 低于默认值将允许消费者组检测故障并从故障中恢复 更快,但也可能因消费者采取措施而导致不必要的重新平衡 完成轮询循环或垃圾收集的时间更长。设置session.timeout.ms 更高会减少意外重新平衡的机会,但也意味着需要 检测真正故障的时间更长。

关于apache-kafka - Kafka消费者死亡处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72615626/

相关文章:

scala - 来自kafka的spark Streaming如何指定轮询事件的截止时间

java - 如何为 onFailure 事件设置超时(Spring,Kafka)?

Python kafka消费者不会消费来自生产者的消息

java - 如何从java应用程序订阅apache kafka中的主题?

java - 在 Kafka 消费者配置中将 max.poll.interval.ms 设置为大于 request.timeout.ms 的负面影响是什么

apache-kafka - 当分区暂停并重新平衡时会发生什么?

apache-kafka - 是否可以为 Google Pub/Sub 主题定义架构,例如在 Kafka 中使用 AVRO?

java - 一个处理HDFS数据的Kafka消费者应该跑到哪里去?

apache-kafka - Kafka 消费者获取主题元数据失败

apache-spark - kafka kafka-consumer-groups.sh --describe 不返回消费者组的输出