目标:我正在尝试从 java cleint 向 kafka 发送消息。
这一直是一种痛苦..
让我简单描述一下。
- 我已在 centos 虚拟机上安装了 kafka。
- 我运行了它附带的zookeeper、服务器、生产者和客户端以及所有默认属性文件。
我成功发送和接收消息。 - 我的计算机与虚拟机上的 Zookeeper (2181) 端口和 kafka 服务器 (9092) 端口有 Telnet 连接。
现在,我想编写java代码来向该主题发送消息。 我使用网站快速入门中的示例:
Properties props = new Properties();
props.put("zk.connect", "http://XX.XX.XX.XX:2181"); // where X is the ip
props.put("serializer.class", "kafka.serializer.StringEncoder");
producer = new Producer<String, String>(new ProducerConfig(props));
它在第四行失败,但有以下异常(exception):
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries
和
rg.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 400
问题。
异常(exception)。
不良参数:
在kafka快速启动示例中我看到它只需要 zk.connect, serializer.class.当我运行它时,它大喊它需要在生产者的构造函数中使用metadata.broker.list。
可以?这样我就感受到了kafka服务器的ip和端口。
顺便说一句 - 是zk.connect还是zookeeper连接?ZkTimeoutException:无法在超时内连接到zookeeper服务器:400maven 错误版本
我访问该网站,看到最新版本是 kafka_2.8.0-0.8.0。
问题 1 - 我使用 intelij 下载它(我认为它是 Maven Central)-
我得到了所有相关的 jar - 只是卡夫卡 jar 是空的(仅包含 list )。问题 2 - 有比网站中的版本更高的版本。他们是官方的吗?
无论如何,我下载了 org.apache.kafka:kafka_2.10:0.8.0使用wireshark,我看到三个SYN和ACK(三次握手),然后是FYN和ACK
就在之后。在动物园管理员的日志中我看到以下内容 [2014-02-27 01:43:42,127] WARN EndOfStreamException:无法从客户端 sessionid 0x0 读取附加数据,可能客户端已关闭套接字
(org.apache.zookeeper.server.NIOServerCnxn)
这意味着我关闭了连接。为什么?
最佳答案
Kafka 0.8 不需要 zk.connect
参数。
您可能已经关注了0.7 quickstart dodumentation 。检查您正在使用的版本,即使您使用的是旧的 0.7 发行版,您也不需要
在您的 zk.connect
属性中需要 "http" 。将其删除为 "xx.xxx.xx.xx:9092"
...
应该只是
props.put(“zk.connect”, “127.0.0.1:2181”);
0.8 关注 this链接,生产者在这两个不同版本中的工作方式几乎没有变化。 如果您是 Kafka 新手,那么您应该使用最新版本,因为其中有很多修复
关于maven - 无法从 java 客户端连接到我的虚拟机上的 kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22068127/