scala - 如何解决Kafka Consumer轮询超时错误

标签 scala apache-kafka vagrant kafka-consumer-api

我正在尝试通过 Vagrant 机器使用 Apache Kafka 来运行一个简单的 Kafka 消费者程序。当程序尝试调用 .poll(100) 方法时,它在 for 循环之前卡住了。

很多人为了调试而深入挖掘更深层次的类,但发现的并不多。

val TOPIC="testTopic"

val  props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.10:9092")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

val consumer = new KafkaConsumer[String, String](props)

consumer.subscribe(util.Collections.singletonList(TOPIC))

while(true) {
  println("Test")
  val records = consumer.poll(100)
  for (record <- records.asScala) {
    println(record)
  }
  println("Test2")
}

当前输出测试,然后卡住了,没有错误消息。预计会输出Kafka主题的内容。

最佳答案

您需要升级您的kafka-clients 版本到2.0.0 或更高版本。当 kafka 服务器关闭时,例如,使用 KafkaConsumer 类中的 poll 方法,您将陷入内部循环,等待代理再次可用。

根据 KIP-266 :

ConsumerRecords

poll​(long timeout)

Deprecated. Since 2.0. Use poll(Duration), which does not block beyond the timeout awaiting partition assignment. See KIP-266 for more information.

在你的情况下:

import org.apache.kafka.clients.consumer.KafkaConsumer; 
import scala.concurrent.duration._

// ...
val timeout = Duration(100, MILLISECONDS) 

while(true) {
  println("Test")
  val records = consumer.poll(timeout)
  for (record <- records.asScala) {
    println(record)
  }
  println("Test2")
}

//...

总之,您只需要导入新版本的KafkaConsumer 类,并将超时参数作为Duration 对象的实例传递给新的poll 方法。

关于scala - 如何解决Kafka Consumer轮询超时错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56534248/

相关文章:

windows - SSH 到 Windows 中的 Vagrant box?

chef-infra - 使用 Vagrant 和 chef-solo 的用户设置 MySQL 数据库

scala - 无法使用 Akka-Http 验证 OAuth2

elasticsearch - 如何为Elasticsearch产生Confluent的Kafka虚拟数据生成器(datagen)消息?

python - 如何在kafka消费者中读取和处理高优先级消息?

go - Kafka 0.11/Golang Sarama 版本支持

vagrant - PhpStorm + Xdebug : Connection established, PhpStorm 中没有弹出调试窗口

android - 如何在 android studio gradle 中降级 proguard 版本?

function - 奇怪的编译器错误 : found Unit, 我尝试返回 Double

Play2.2.1框架中的Json Scala对象序列化