我正在尝试通过 Vagrant 机器使用 Apache Kafka 来运行一个简单的 Kafka 消费者程序。当程序尝试调用 .poll(100) 方法时,它在 for 循环之前卡住了。
很多人为了调试而深入挖掘更深层次的类,但发现的并不多。
val TOPIC="testTopic"
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.10:9092")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Collections.singletonList(TOPIC))
while(true) {
println("Test")
val records = consumer.poll(100)
for (record <- records.asScala) {
println(record)
}
println("Test2")
}
当前输出测试,然后卡住了,没有错误消息。预计会输出Kafka主题的内容。
最佳答案
您需要升级您的kafka-clients 版本到2.0.0 或更高版本。当 kafka 服务器关闭时,例如,使用 KafkaConsumer
类中的 poll 方法,您将陷入内部循环,等待代理再次可用。
根据 KIP-266 :
ConsumerRecords
poll(long timeout)
Deprecated. Since 2.0. Use poll(Duration), which does not block beyond the timeout awaiting partition assignment. See KIP-266 for more information.
在你的情况下:
import org.apache.kafka.clients.consumer.KafkaConsumer;
import scala.concurrent.duration._
// ...
val timeout = Duration(100, MILLISECONDS)
while(true) {
println("Test")
val records = consumer.poll(timeout)
for (record <- records.asScala) {
println(record)
}
println("Test2")
}
//...
总之,您只需要导入新版本的KafkaConsumer
类,并将超时参数作为Duration
对象的实例传递给新的poll 方法。
关于scala - 如何解决Kafka Consumer轮询超时错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56534248/