我使用的是 kafka 0.10.2,现在遇到了 CommitFailedException。喜欢:
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
我已将 max.poll.interval.ms 设置为 Integer.MAX_VALUE。所以谁能告诉我为什么即使我设置了值仍然会发生这种情况?
另一个问题是: 我按照描述将 session.timeout.ms 设置为 60000,但它仍然发生。我尝试通过一个简单的代码重现
public static void main(String[] args) throws InterruptedException {
Logger logger = Logger.getLogger(KafkaConsumer10.class);
logger.info("XX");
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker:9098");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("max.poll.interval.ms", "300000");
props.put("session.timeout.ms", "10000");
props.put("max.poll.records", "2");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("t1"));
while (true) {
Thread.sleep(11000);
ConsumerRecords<String, String> records = consumer.poll(100);
//Thread.sleep(11000);
Thread.sleep(11000);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
当我将 session.timeout.ms 设置为 10000 时,我尝试在我的轮询循环中 hibernate 超过 10000 毫秒,但它似乎有效并且没有异常。所以我对此感到困惑。如果心跳是由 consumer.poll 和 consumer.commit 触发的,那么在我的代码中心跳似乎超出了 session 超时。为什么不抛出 CommitFailedException ?
最佳答案
session.timeout.ms
consumer 设置应该小于 Kafka broker 设置的group.max.session.timeout.ms
。
这为我解决了这个问题。
归功于 github 链接 Commit Failures
关于java - CommitFailedException 无法完成提交,因为该组已经重新平衡并将分区分配给另一个成员,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45560255/