我正在尝试使用 KafkaSpout 从 Kafka 队列读取消息。我要么什么也没得到,要么出现以下错误:
2 [Thread-10-kafka-storm-spout] ERROR util:0 - Async loop died!
java.lang.NoSuchMethodError: scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String;
at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:38)
at kafka.javaapi.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:34)
at storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:43)
at storm.kafka.PartitionManager.<init>(PartitionManager.java:57)
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:80)
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:52)
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:118)
at backtype.storm.daemon.executor$fn__3284$fn__3299$fn__3328.invoke(executor.clj:563)
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:431)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:744)
11 [Thread-10-kafka-storm-spout] ERROR executor:0 -
java.lang.NoSuchMethodError: scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String;
at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:38)
at kafka.javaapi.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:34)
at storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:43)
at storm.kafka.PartitionManager.<init>(PartitionManager.java:57)
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:80)
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:52)
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:118)
at backtype.storm.daemon.executor$fn__3284$fn__3299$fn__3328.invoke(executor.clj:563)
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:431)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:744)
这是我的代码:
TopologyBuilder builder = new TopologyBuilder();
String TOPIC_NAME = "topic";
String spoutName = "kafka-storm-spout";
BrokerHosts brokerHosts = new ZkHosts("localhost:2181");
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, TOPIC_NAME, "", "storm");
builder.setSpout(spoutName, new KafkaSpout(kafkaConfig), 1);
builder.setBolt("kafka-bolt", new TestBolt()).shuffleGrouping(spoutName);
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka-test", config, builder.createTopology());
System.out.println("Topology submitted");
Utils.sleep(5000);
System.out.println("Shutting down");
cluster.shutdown();
有什么想法吗?
最佳答案
您很可能尝试使用不同版本的 scala。 Kafka 是为不同版本的 scala ( https://kafka.apache.org/downloads.html ) 构建的。查看您的依赖项并确保您仅使用一种版本的 scala。
关于java - 无法使用StormSpout读取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28114553/