java - kafka spout不发出数据

标签 java apache-kafka apache-storm kafka-consumer-api

我正在尝试将 Kafka 与 Storm 集成。我正在使用 Kafka Spout 从 Kafka 主题检索数据并将其提供给 Storm Bolt 进行进一步处理。我能够成功提交拓扑,但 Spout 没有发出任何数据。它也不会抛出任何错误。我对 Kafka 和 Storm 很陌生。所以,我无法弄清楚这个问题背后的原因。请提出修改建议。提前致谢!!

Screen Shot of Storm UI after submitting the topology

我的拓扑:

public class TopologyMain {

 private static final String SENTENCE_SPOUT_ID = "kafka-sentence-spout";


public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException {
    int numSpoutExecutors = 1;


    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout(SENTENCE_SPOUT_ID, buildKafkaSentenceSpout(), numSpoutExecutors);
    builder.setBolt("word-normalizer", new WordNormalizer())
        .shuffleGrouping(SENTENCE_SPOUT_ID);
    builder.setBolt("word-counter", new WordCounter(),2)
        .shuffleGrouping("word-normalizer");

    //Configuration
    Config conf = new Config();
    conf.setDebug(false);
    //Topology run
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
    conf.put(Config.NIMBUS_HOST, "192.168.1.229");
    conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
    System.setProperty("storm.jar", "/home/ubuntu/st/stIn/target/storm-wc.jar");
    StormSubmitter.submitTopology("Count-Word-Topology", conf,builder.createTopology());

}



 private static KafkaSpout buildKafkaSentenceSpout() {
      BrokerHosts hosts = new ZkHosts("localhost:2181");
      SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/acking-kafka-sentence-spout", "acking-sentence-spout");
      spoutConfig.forceFromStart = true;
      spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
      return new KafkaSpout(spoutConfig);
    }
 }

最佳答案

我明确地将项目的 Maven 依赖项中的所有 jar 文件复制到 Storm 库,一切工作正常。此外,我还将 Storm jar(用于提交拓扑的 jar)复制到 Storm/lib。

关于java - kafka spout不发出数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30437382/

相关文章:

javascript - javamail.setFrom() 在我的 spring 项目中不起作用

apache-kafka - kafka集群扩容通用步骤

java - 我的 Kafka 自定义分区器类中出现错误

java - 如何通过 KafkaAdminClient 删除主题配置

java - Servlet注释到xml中

java - 无法设置运行java文件的值

java - dis.readchar 中文字母 - 错误解释的字符!

java.util.Properties$LineReader.readLine 处的 java.lang.NullPointerException

clojure - 在 Clojure 中正确导入 Apache Storm 依赖项

apache-storm - Apache Apex与Apache Storm有何不同?