如下所示,我的代码是一个高级消费者在kafka服务器中获取具有32个分区的主题,我很困惑为什么有时我从consumer.poll()得到空返回。 我尝试增加轮询超时,然后当我将超时增加到1000时,然后每次轮询都有返回数据,而我将超时设置为10或0,然后我看到很多空返回。
谁能告诉我如何设置正确的超时?
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "kafka-01:9098")
props.put("group.id", "kch1")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
//props.put("max.poll.records", "1000")
val consumers = new Array[KafkaConsumer[String, String]](16)
for(i <- 0 to 15) {
consumers(i) = new KafkaConsumer[String, String](props)
consumers(i).subscribe(util.Arrays.asList("veh321"))
}
var cnt = 0
var cacheIterator: Iterator[ConsumerRecord[String, String]] = null
for(i <- 0 to 15) {
new Thread(new Runnable {
override def run(): Unit = {
var finish = false
while(!finish) {
val start = System.currentTimeMillis()
cacheIterator = consumers(i).poll(100).iterator()
val end = System.currentTimeMillis() - start
if (end > 10 ) {
println(s"${Thread.currentThread().getId} + Duration is ${end}, ${cacheIterator.hasNext} ${cacheIterator.size}")
}
}
}
}).start()
}
最佳答案
Java Consumer通过调用java.nio.channels.Selector.select(timeout),采用Linux的epoll作为底层实现方案。如果您只给它 100 毫秒的时间来尝试在这么短的时间间隔内有多少个 SelectionKey 准备就绪,那么它很可能不会返回任何内容。
此外,在这100毫秒内,消费者还会做一些其他工作,包括轮询协调器状态,因此记录轮询的实际时间间隔显然小于100毫秒,这使得检索一些真正有用的东西变得更加困难。
关于java - 当kafka消费者轮询返回空记录时?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43603931/