scala - Kafka主题创建: Timed out waiting for a node assignment

标签 scala docker apache-kafka

我已经使用以下 docker-compose.yml 运行了一个本地 kafka

version: '2'
services:
  zookeeper:
    image: "confluentinc/cp-zookeeper:5.0.1"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: "confluentinc/cp-enterprise-kafka:5.0.1"
    ports:
      - '9092:9092'
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100

尝试在 Scala 中使用 kafka-client 2.1.0 运行基本创建主题:

val props = new Properties()
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")

val adminClient: AdminClient = AdminClient.create(props)
val newTopic = new NewTopic("test", 1, 1.toShort)
val topicsF = adminClient.createTopics(List(newTopic).asJavaCollection)
val result = topicsF.all().get()

但过了一段时间我得到:

org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

我可以使用命令行创建主题:

kafka-topics --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic test
Created topic "test".

kafka AdminClient API Timed out waiting for node assignment描述了一个使用 Java 的类似问题,但评论表明系统重启解决了这个问题,而我这边的情况并非如此。

最佳答案

如果您在 Docker(或类似)中运行 Kafka,则需要正确配置监听器。 This article详细介绍一下。

Here's an example一个 Docker Compose,您可以使用它从您的主机访问 Kafka。

免责声明:我写了这篇文章 :)

关于scala - Kafka主题创建: Timed out waiting for a node assignment,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53586130/

相关文章:

scala - SortedMap 中最近的键

docker push 到谷歌云 GCP 失败,名称未知 : Buckets

docker - 从Docker容器SSH出到局域网上的服务器

java - 带有 Exactly Once 处理的 Kafka 的 Spring Boot

apache-kafka - 自上次追加以来,卡夫卡生产者发送的消息已过期 30003 毫秒

scala - 如何使用 OverflowStrategy 打印流中删除的元素?

scala - 函数特征和隐式参数

scala - 如何将 Array[Long] 转换为 Scala 数据帧中的 Vector 类型?

docker - 如何使用 Travis-CI 在 CentOS 7 上运行测试?

elasticsearch - 如何扩展logstash节点?