apache-kafka - 默认情况下,max.poll.intervals.ms设置为int.Max

标签 apache-kafka apache-kafka-streams

Apache Kafka文档指出:

The internal Kafka Streams consumer max.poll.interval.ms default value was changed from 300000 to Integer.MAX_VALUE



由于此值用于检测一批记录的处理时间何时超过给定的阈值,因此是否有这种“无限”值的原因?

它使应用程序变得无响应吗?还是在处理时间过长时,Kafka Streams有另一种离开消费者群体的方式?

最佳答案

Does it enable applications to become unresponsive? Or Kafka Streams has a different way to leave the consumer group when the processing is taking too long?


Kafka Streams在这种情况下利用了Kafka消费者客户端的心跳功能,因此将心跳(“此应用程序实例是否还活着吗?”)与对poll()的调用分离了。两个主要参数是session.timeout.ms(用于心跳线程)和max.poll.interval.ms(用于处理线程),它们的区别在https://stackoverflow.com/a/39759329/1743580上进行了更详细的描述。
引入了心跳信号,以便可以使应用程序实例花费大量时间来处理记录,而不会被认为“没有取得进展”,因此“无济于事”。例如,您的应用程序可以在一分钟内完成一次单个记录的大量处理,同时仍然对Kafka保持心跳:“嘿,我还活着,我正在取得进步。但是我还没有完成处理工作。 敬请关注。”
当然,您可以将max.poll.interval.ms从其默认值(Integer.MAX_VALUE)更改为较低的设置,例如,如果您确实希望将您的应用实例视为“死”,如果它在轮询记录之间花费的时间超过X秒,那么如果处理最新一轮记录需要的时间超过X秒。这样的配置是否有意义取决于您的特定用例-在大多数情况下,默认设置是安全的选择。

session.timeout.ms: The timeout used to detect consumer failures when using Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

max.poll.interval.ms: The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.

关于apache-kafka - 默认情况下,max.poll.intervals.ms设置为int.Max,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47906485/

相关文章:

java - Apache Camel 与 Kafka Producer 的内存泄漏

kubernetes - 在Kubernetes中,Kafka Connect部署无法连接到Kafka Broker服务(内部图)

docker - Kafka stream.allMetadata()在DOCKER中返回空列表(交互式查询)

mongodb - 如何告诉 MongoSource(使用 Kafka Connect)序列化什么 Key

apache-kafka - 一个kafka消费者可以属于多个消费者组吗?

apache-spark - 无法将 Catalyst 类型 IntegerType 转换为 Avro 类型 ["null","int"]

apache-kafka - Kafka 流 - 加入两个 ktables 调用连接函数两次

java - 是否可以对 GlobalKTable 进行 ReKey?

apache-kafka - 流消息到多个主题

apache-kafka - KStream-KStream leftJoin 在窗口到期后不一致地发出