apache - 无法从 Kafka 生产者连接到远程动物园管理员

标签 apache timeout distributed apache-zookeeper

我已经玩了几天 Apache Kafka,这是我的问题,
如果我设置了网站“快速入门”部分中描述的本地测试,则一切正常,kafka 生产者/消费者、zookeeper 服务器和 kafka 代理完美运行。

现在,如果我在远程服务器上运行(我们称之为 node2):
- 动物园管理员 - 端口 2181
- 卡夫卡经纪人 - 端口 9092
- 卡夫卡消费者

然后,如果我从本地计算机运行:
- 卡夫卡制作人

假设node2上没有防火墙。
连接以超时结束。

这是错误日志:

/etc/java/jdk1.6.0_41/bin/java -Didea.launcher.port=7533 -Didea.launcher.bin.path=/home/kevin/Documents/idea-IU-123.169/bin -Dfile.encoding=UTF-8 -classpath /etc/java/jdk1.6.0_41/lib/dt.jar:/etc/java/jdk1.6.0_41/lib/tools.jar:/etc/java/jdk1.6.0_41/lib/jconsole.jar:/etc/java/jdk1.6.0_41/lib/htmlconverter.jar:/etc/java/jdk1.6.0_41/lib/sa-jdi.jar:/home/kevin/Desktop/kafka-0.7.2/examples/target/scala_2.8.0/classes:/home/kevin/Desktop/kafka-0.7.2/project/boot/scala-2.8.0/lib/scala-compiler.jar:/home/kevin/Desktop/kafka-0.7.2/project/boot/scala-2.8.0/lib/scala-library.jar:/home/kevin/Desktop/kafka-0.7.2/core/target/scala_2.8.0/classes:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/jopt-simple-3.2.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/log4j-1.2.15.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/zookeeper-3.3.4.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/zkclient-0.1.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/snappy-java-1.0.4.1.jar:/home/kevin/Desktop/kafka-0.7.2/examples/lib_managed/scala_2.8.0/compile/jopt-simple-3.2.jar:/home/kevin/Desktop/kafka-0.7.2/examples/lib_managed/scala_2.8.0/compile/log4j-1.2.15.jar:/home/kevin/Documents/idea-IU-123.169/lib/idea_rt.jar com.intellij.rt.execution.application.AppMain kafka.examples.KafkaConsumerProducerDemo
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection).
log4j:WARN Please initialize the log4j system properly.
Exception in thread "Thread-0" java.net.ConnectException: Connection timed out
    at sun.nio.ch.Net.connect(Native Method)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:532)
    at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
    at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:125)
    at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:114)
    at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:100)
    at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:100)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
    at kafka.producer.ProducerPool.send(ProducerPool.scala:100)
    at kafka.producer.Producer.zkSend(Producer.scala:137)
    at kafka.producer.Producer.send(Producer.scala:99)
    at kafka.javaapi.producer.Producer.send(Producer.scala:103)
    at kafka.examples.Producer.run(Producer.java:53)

Process finished with exit code 0

这是我的生产者代码:
import java.util.Properties;
import kafka.javaapi.producer.ProducerData;
import kafka.producer.ProducerConfig;


public class Producer extends Thread{

  private final kafka.javaapi.producer.Producer<String, String> producer;
  private final String topic;
  private final Properties props = new Properties();

  public Producer(String topic)
  {
    props.put("zk.connect", "node2:2181");
    props.put("connect.timeout.ms", "5000");
    props.put("socket.timeout.ms", "30000");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("producer.type", "sync");
    props.put("conpression.codec", "0");
    producer = new kafka.javaapi.producer.Producer<String, String>(new ProducerConfig(props));
    this.topic = topic;
  }

  public void run() {
      String messageStr = new String("Message_test");
      producer.send(new ProducerData<String, String>(topic, messageStr));
  }
}

**所以我也测试了切换
props.put("zk.connect", "node2:2181");

经过
props.put("broker.list", "0:node2:9082");

在这种情况下,我可以成功连接。**

最佳答案

参见 http://kafka.apache.org/faq.html 中的第 3 项

解决方法是在 Kafka 的 server.properties 中显式设置主机名属性

您可以使用 Zookeeper 验证这一点。如果您使用的是 kafka 0.7*,请打开 ZkCli 控制台并获取/brokers/ids/0 并且您应该获取所有代理元数据。确保此处的 IP 地址/主机名与您在生产者代码中使用的 Zk 连接字符串匹配 -

props.put("zk.connect", "node2:2181");

就我而言,我使用的是在连接到 ubuntu VM(相同的机器,不同的 IP)的本地机器上运行的生产者,这种解决方法有所帮助。

关于apache - 无法从 Kafka 生产者连接到远程动物园管理员,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15209361/

相关文章:

apache - tomcat是否可以通过过滤主机别名进行ssl重定向?

Cassandra 超时?

timeout - 在 Apache JMeter 中配置响应超时

testing - JMeter分布式测试

mongodb - 正在寻找所有节点都可读/写的分布式/可扩展数据库解决方案?不是 MongoDB?

ruby - 我可以使用 Ruby 通过 TCP 发送对象吗?

linux - Assets 的最佳可扩展性网络服务器是什么?

php - SSL 到非 SSL htaccess 重定向的问题

apache - 在 centos apache 中安装 mod_brotli

python - signal.SIGALRM 与 uwsgi