docker - 卡夫卡消费者无法连接

标签 docker apache-kafka kafka-consumer-api

我在docker中运行kafka和zookeeper。

CONTAINER ID        IMAGE                 COMMAND                  CREATED             STATUS              PORTS                                                                    NAMES
34a113626eb6        confluent/kafka       "/usr/local/bin/ka..."   11 minutes ago      Up 11 minutes       0.0.0.0:9092->9092/tcp                                                   kafka
552d1d787b9e        confluent/zookeeper   "/usr/local/bin/zk..."   12 minutes ago      Up 12 minutes       0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp   zookeeper

但是,当我使用kafka命令行发送消费者消息时,我看到了此错误:
iMac:kafka lluo$ bin/kafka-console-consumer.sh --zookeeper `docker-machine ip bigdata`:2181 --topic bigdata
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
[2018-10-09 09:09:05,504] WARN [ConsumerFetcherThread-console-consumer-62804_Russells-iMac.hsd1.ca.comcast.net-1539101285161-d8affa4e-0-0]: Error in fetch to broker 0, request Name: FetchRequest; Version: 3; CorrelationId: 0; ClientId: console-consumer-62804; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; MaxBytes:2147483647 bytes; RequestInfo: ([bigdata,0],PartitionFetchInfo(5,1048576)) (kafka.consumer.ConsumerFetcherThread)
java.net.SocketTimeoutException
    at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211)
    at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
    at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:85)
    at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122)
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:102)
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:86)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:135)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:135)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:135)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:134)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:134)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:134)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
    at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:133)
    at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:114)
    at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:35)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:151)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)

有人解决过这个问题吗?

最佳答案

代替zookeeper,您需要使用bootstrap-server(默认端口9092)。
您收到的警告清楚地表明您正在使用旧的已弃用的使用者。

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

由于Kafka已将偏移量存储从Zookeeper移至Brokers,因此Kafka使用者需要bootstrap-server参数,因为它需要连接到Kafka Brokers,因此,无需直接与Zookeeper通信。

关于docker - 卡夫卡消费者无法连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52725367/

相关文章:

java - 为什么 Producer.send(record).get() 有效,但 Producer.send(record, callback) 不起作用

docker - 如何防止 docker build 重新下载复制的 Go vendor

ssl - Docker 群 TLS 无法验证挂起的节点

在客户端重新启动之前,无法使用 docker-compose 访问 Spring Config 服务器

apache-kafka - 理解kafka、消费者群体和主题

apache-kafka - Kafka 命令行生产者/消费者有 1 秒的延迟

node.js - 如何将 Node docker 容器与 postgres docker 容器连接

apache-spark - Spark-Streaming 最早在 kafka 开始偏移时挂起(Kafka 2,spark 2.4.3)

java - 如何使用 Java 从 Kafka 中删除 ACLS

apache-kafka - kafka + Consumer Group的根本原因应该是重新平衡