我正在尝试将 Kafka 与 Storm 集成。我正在使用 Kafka Spout 从 Kafka 主题检索数据并将其提供给 Storm Bolt 进行进一步处理。我能够成功提交拓扑,但 Spout 没有发出任何数据。它也不会抛出任何错误。我对 Kafka 和 Storm 很陌生。所以,我无法弄清楚这个问题背后的原因。请提出修改建议。提前致谢!!
我的拓扑:
public class TopologyMain {
private static final String SENTENCE_SPOUT_ID = "kafka-sentence-spout";
public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException {
int numSpoutExecutors = 1;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, buildKafkaSentenceSpout(), numSpoutExecutors);
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping(SENTENCE_SPOUT_ID);
builder.setBolt("word-counter", new WordCounter(),2)
.shuffleGrouping("word-normalizer");
//Configuration
Config conf = new Config();
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
conf.put(Config.NIMBUS_HOST, "192.168.1.229");
conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
System.setProperty("storm.jar", "/home/ubuntu/st/stIn/target/storm-wc.jar");
StormSubmitter.submitTopology("Count-Word-Topology", conf,builder.createTopology());
}
private static KafkaSpout buildKafkaSentenceSpout() {
BrokerHosts hosts = new ZkHosts("localhost:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/acking-kafka-sentence-spout", "acking-sentence-spout");
spoutConfig.forceFromStart = true;
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
return new KafkaSpout(spoutConfig);
}
}
最佳答案
我明确地将项目的 Maven 依赖项中的所有 jar 文件复制到 Storm 库,一切工作正常。此外,我还将 Storm jar(用于提交拓扑的 jar)复制到 Storm/lib。
关于java - kafka spout不发出数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30437382/