java - Kafka消费者不消费

标签 java scala apache-kafka kafka-consumer-api

这是我从 java 客户端构建 kafka 消费者的代码。

  def buildConsumer[Key, Value](
    configuration: KafkaConfiguration, commitInterval: Long, groupId: Option[String] = None)(
    implicit keyDeserializer: Deserializer[Key], valueDeserializer: Deserializer[Value]
  ): KafkaJavaConsumer[Key, Value] = {
    val settingsMap: Map[String, Object] = Map(
      "bootstrap.servers" -> s"${configuration.bootstrapHost}:${configuration.bootstrapPort}",
      "group.id" -> groupId.getOrElse(s"${configuration.topic}-${UUID.randomUUID}"),
      "enable.auto.commit" -> "true",
      "auto.commit.interval.ms" -> commitInterval.toString,
      "auto.offset.reset" -> "earliest"
    ) ++ configuration.additionalOptions.getOrElse(Map.empty[String, Object])
    val consumer = new KafkaJavaConsumer[Key, Value](settingsMap.asJava, keyDeserializer, valueDeserializer)
    consumer.subscribe(Seq(configuration.topic).asJava)
    consumer
  }

我的 kafka 正在端口 6050 上运行,我已经在控制台中对其进行了测试,以从该特定端口进行生产和消费。我想知道我的问题是否与我上面的配置有关。我还使用 EmbeddedKafka 框架测试了上面的代码,问题似乎出在实际运行的 kafka 服务器上。

编辑:

我忘记补充一点,我有多个消费者(具有不同的 group.id's)从同一代理消费,不确定这是否是问题所在。

最佳答案

确保,

No. of partitions in the topic >= No. of consumer instances in the group

否则,组中的某些消费者实例将不会被分配任何分区。

要检查分区数量,请​​使用 kafka-topics.sh 命令

> sh kafka-topics.sh --zookeeper localhost:2181 --topic test --describe 主题:测试 PartitionCount:6 ReplicationFactor:1 配置: 主题:测试 分区:0 领导者:0 副本:0 Isr:0 主题:测试 分区:1 领导者:0 副本:0 Isr:0 主题:测试 分区:2 领导者:0 副本:0 Isr:0 主题:测试 分区:3 领导者:0 副本:0 Isr:0 主题:测试 分区:4 领导者:0 副本:0 Isr:0 主题:测试 分区:5 领导者:0 副本:0 Isr:0

关于java - Kafka消费者不消费,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38111628/

相关文章:

scala - 将 Enumerator[T] 转换为 List[T]

java - 使用 Spark Streaming 连接到 Cassandra 时出错

java - 如何使用 DaoAuthenticationProvider 以编程方式使用 Spring Security 对用户进行身份验证

java - 为什么这个多线程程序会陷入死循环?

scala - 如何复制实例并覆盖特征中声明的值字段

javascript - 我想将后端和前端分开,让它们使用HTTP消息进行通信。哪些技术可以帮助我实现这一目标?

apache-kafka - 在生产中使用kafka内的zookeeper可以吗?

hadoop - Camus Migration - Kafka HDFS Connect 不从设置的偏移量开始

java - 如何在不使用 JavascriptExecutor 的情况下在网页中向下/向上滚动

java - 过多的触摸事件会降低 Android 应用程序的速度