java - 使用 Kafka Spout 集成 Kafka Storm

标签 java apache-kafka apache-storm

我正在使用 KafkaSpout。请在下面找到测试程序。

我正在使用 Storm 0.8.1。 Storm 0.8.2 中有 Multischeme 类。我会用那个。我只想知道早期版本是如何仅通过实例化 StringScheme() 类来工作的?在哪里可以下载早期版本的 Kafka Spout?但我怀疑这不是在 Storm 0.8.2 上工作的正确选择。 ??? (困惑)

当我在 Storm 集群上运行代码(下面给出)时(即当我推送我的拓扑时)我得到以下错误(当 Scheme 部分被注释时会发生这种情况,当然我会得到编译器错误,因为类不是在 0.8.1 中):

java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme
        at storm.kafka.TestTopology.main(TestTopology.java:37)
Caused by: java.lang.ClassNotFoundException: backtype.storm.spout.MultiScheme

在下面给出的代码中,您可能会发现 spoutConfig.scheme=new StringScheme();部分评论。如果我不评论该行,我会收到编译器错误,但很自然,因为那里没有构造函数。此外,当我实例化 MultiScheme 时出现错误,因为我在 0.8.1 中没有该类。

public class TestTopology {
    public static class PrinterBolt extends BaseBasicBolt {
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }

        public void execute(Tuple tuple, BasicOutputCollector collector) {
            System.out.println(tuple.toString());
        }
    }

    public static void main(String [] args) throws Exception {
        List<HostPort> hosts = new ArrayList<HostPort>();
        hosts.add(new HostPort("127.0.0.1",9092));
        LocalCluster cluster = new LocalCluster();
        TopologyBuilder builder = new TopologyBuilder();
        SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 1), "test", "/zkRootStorm", "STORM-ID");
        spoutConfig.zkServers=ImmutableList.of("localhost");
        spoutConfig.zkPort=2181;
        //spoutConfig.scheme=new StringScheme();
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        builder.setSpout("spout",new KafkaSpout(spoutConfig));
        builder.setBolt("printer", new PrinterBolt())
                .shuffleGrouping("spout");
        Config config = new Config();

        cluster.submitTopology("kafka-test", config, builder.createTopology());

        Thread.sleep(600000);
    }

最佳答案

我遇到了同样的问题。终于解决了,我把完整的运行示例放在了github上。

欢迎您在这里查看> https://github.com/buildlackey/cep

(单击 storm+kafka 目录以获取应该让您启动并运行的示例程序)。

关于java - 使用 Kafka Spout 集成 Kafka Storm,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17342454/

相关文章:

java - 如何设置组件的DataFlavor?

elasticsearch - Logstash 和卡夫卡

java - storm hdfs connector ...尝试使用storm将数据写入hdfs

hadoop - apache storm 是否允许处理存储在 HDFS 上的大量文件?

java - 获取 Guice 对象图中的对象

java正则表达式不能使用符号?和 {n} 一起

java - 将 FormData 对象读入 Java HTTP 触发的 Azure Functions

hadoop - 使用Kafka Connect HDFS时,AccessControlException用户=根,访问=写…

java - 一次连续向10000个客户端发布数据

netty - Spark 是如何使用 Netty 的?