java - 卡夫卡 throw "org.apache.kafka.clients.consumer.CommitFailedException"

标签 java apache-kafka kafka-consumer-api spring-kafka

我使用 spring-kafka 库开发了 Kafka 消费者应用程序,并使用默认消费者配置和手动提交。

我正在运行两个应用程序实例,监听两个不同的 Kafka 主题。在执行负载测试时,我发现只有一个更高负载的应用程序出现以下错误:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. 

    This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, 
    which typically implies that the poll loop is spending too much time message processing. 

    You can address this either by increasing the session timeout or by reducing the maximum size of batches 
    returned in poll() with max.poll.records.


org.apache.kafka.clients.consumer.CommitFailedException: 
    Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. 

    This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, 
    which typically implies that the poll loop is spending too much time message processing. 


You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    \n org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)

我阅读了几篇文章,发现如果消费者花费大量时间来处理消息,而代理没有获取有关消费者活跃度的信息,那么消费者就会发生重新平衡,并且对于未提交的消息将抛出上述异常。

我已通过将 max.poll.interval.ms 设置为 INEGER.MAX_VALUE 解决了上述错误。 但我想知道为什么我仅在一个实例中出现上述错误,以及为什么其他实例在更高负载下按预期工作。

任何人都可以分享 max.poll.interval.ms 的正确根本原因和理想值或此问题的适当解决方案

最佳答案

除了 Yoav 的建议外,如果无法减小批量大小,您还可以尝试增大 max.poll.interval.ms 的值。来自 Kafka 文档:

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.

关于java - 卡夫卡 throw "org.apache.kafka.clients.consumer.CommitFailedException",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51537244/

相关文章:

java - 什么是NullPointerException,我该如何解决?

java - String.xml 中的字符串和整数的问题

apache-kafka - Kafka - 主题 & 分区 & 消费者

rest - Kafka 消息 VS REST 调用

java - 为什么我的 Kafka 消费者投票这么快?

java.lang.NoSuchMethodException : com. sun.proxy .....关于struts2 Action

java - 枚举 - 静态和实例 block

java - 从 Kafka Streams 反序列化对象时出错

scala - 无法使用Zookeeper启动kafka(kafka.common.InconsistentClusterIdException)

hadoop - 哪些技术可用于将数据从社交媒体流式传输到 hadoop?