我想了解 kafkaConsumer.poll() 方法的行为 我将消费者配置为不自动提交
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", bootstrapAddress);
KafkaConsumer consumer = new KafkaConsumer(properties);
据我了解,根据 Javadoc,如果我这样做
ConsumerRecords firstBatch = consumer.poll(0l);
ConsumerRecords secondBatch = consumer.poll(0l);
firstBatch
和 secondBatch
都应该包含相同的 ConsumerRecords
,假设主题中只有一个分区,因为偏移量没有已 promise 。
我的假设正确吗?我的问题是,每次调用 consumer.poll(0l)
时,都会获取下一批 ConsumerRecords
最佳答案
Both firstBatch and secondBatch should contain the same ConsumerRecords
这是错误的,即使禁用自动offset
或手动提交offset
,Kafka消费者偏移量也会在每次后续轮询中自动增加
The position of the consumer gives the offset of the next record that will be given out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances every time the consumer receives messages in a call to poll(long)
The committed position is the last offset that has been stored securely. Should the process fail and restart, this is the offset that the consumer will recover to. The consumer can either automatically commit offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs (e.g. commitSync and commitAsync).
你的假设在另一方面是正确的,当offset
未提交并且kafka消费者重新启动时,它将轮询旧批处理或从提交旧偏移量的开头开始。
关于java - 即使禁用自动偏移,卡夫卡消费者民意调查也不会读取同一批处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58459431/