java - CommitFailedException 无法完成提交,因为该组已经重新平衡并将分区分配给另一个成员

标签 java apache-kafka

我使用的是 kafka 0.10.2,现在遇到了 CommitFailedException。喜欢:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

我已将 max.poll.interval.ms 设置为 Integer.MAX_VALUE。所以谁能告诉我为什么即使我设置了值仍然会发生这种情况?

另一个问题是: 我按照描述将 session.timeout.ms 设置为 60000,但它仍然发生。我尝试通过一个简单的代码重现

 public static void main(String[] args) throws InterruptedException {     
        Logger logger = Logger.getLogger(KafkaConsumer10.class);
        logger.info("XX");
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka-broker:9098");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("max.poll.interval.ms", "300000");
        props.put("session.timeout.ms", "10000");
        props.put("max.poll.records", "2");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("t1"));
        while (true) {
            Thread.sleep(11000);
            ConsumerRecords<String, String> records = consumer.poll(100);
            //Thread.sleep(11000);
            Thread.sleep(11000);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }

当我将 session.timeout.ms 设置为 10000 时,我尝试在我的轮询循环中 hibernate 超过 10000 毫秒,但它似乎有效并且没有异常。所以我对此感到困惑。如果心跳是由 consumer.poll 和 consumer.commit 触发的,那么在我的代码中心跳似乎超出了 session 超时。为什么不抛出 CommitFailedException ?

最佳答案

session.timeout.ms consumer 设置应该小于 Kafka broker 设置的group.max.session.timeout.ms

这为我解决了这个问题。

归功于 github 链接 Commit Failures

关于java - CommitFailedException 无法完成提交,因为该组已经重新平衡并将分区分配给另一个成员,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45560255/

相关文章:

Java- super 引用

java - Eclipse:Java 的全文自动完成?

javascript - Telegram Bot 通过 node.js 发送照片

java - 将长字符串写入 HTML 文件,InputStream、FileWriter 与 BufferedReader

apache-spark - Spark 流 DirectKafkaInputDStream : kafka data source can easily stress the driver node

apache-spark - 如何在 PySpark 中创建具有偏移量的 InputDStream(使用 KafkaUtils.createDirectStream)?

hadoop - Hive插入流数据产生的小文件如何管理?

java - iBatis - 加载对象列表

java - 在 Java Spring boot 中是否有一种方法可以对不同的 Java 对象使用单个 kafka 队列?

apache-kafka - 我们可以在 mongodb 中更新/更新记录吗?数据源是kafka