scala - InvalidGroupIdException : in Kafka scala consumer program even after setting group. ID

标签 scala apache-kafka kafka-consumer-api

我面临着

org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.



从用 Scala 编写的命令行运行以下 kafka 消费者 API 时出错。可能是什么问题?
    object KafkaAggregateConsumerApp extends App{
  try {
    val properties: Properties = new Properties()
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "0:9092") 
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer")
    properties.put("group.id", "console-consumer-myapp")

    val consumerApp = new KafkaConsumer[String, Int](properties)
    consumerApp.subscribe(Pattern.compile("kafkaaggregationsource1"))

    try {
      while (true) {
        val consumerRecord: ConsumerRecords[String, Int] = consumerApp.poll(Duration.ofMinutes(10))
        consumerRecord.forEach((each) => println(each.key() + " " + each.value()))
      }
    } finally {
      consumerApp.close()
    }
  }
  catch{
    case e: Exception => e.printStackTrace()
  }
}

最佳答案

source code我可以看到这个 InvalidGroupIdException groupId 时抛出是 null ,

private void maybeThrowInvalidGroupIdException() {
    if (groupId == null)
        throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " +
                "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.");
}


我建议使用 ConsumerConfig.GROUP_ID_CONFIG 设置组 ID而不是 "group.id" :
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "console-consumer-myapp")

关于scala - InvalidGroupIdException : in Kafka scala consumer program even after setting group. ID,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56202988/

相关文章:

scala - 猫 Monad 变形金刚

scala - 用Gradle构建可执行的Jar以进行加特林

java - 使用 PEM 配置 Kafka 客户端单向 SSL

scala - 匿名函数中的Scala返回语句

java - 当引导服务器关闭时,具有 transactionIdPrefix 的 DefaultKafkaProducerFactory 会无限等待

apache-spark - 每个直接流创建了多少消费者来读取记录?

java - Kafka Consumer仅在产生 'enough'数据后才读取

apache-kafka - Apache Pulsar 与 Kafka - 消费者是否从主题中提取(轮询)消息?

java - Kafka消费者组,创建消费者组时将offset设置为0

scala - 无法在本地主机中使用 Solr 连接到 ZooKeeper