hadoop - Storm 中的 DRPC 服务器错误

标签 hadoop apache-storm trident

我正在尝试执行下面的代码并收到错误..不确定我是否遗漏了一些东西..还有我在哪里可以看到输出?

错误

java.lang.RuntimeException:没有为拓扑配置 DRPC 服务器 在 backtype.storm.drpc.DRPCSpout.open(DRPCSpout.java:79) 在 storm.trident.spout.RichSpoutBatchTriggerer.open(RichSpoutBatchTriggerer.java:58) 在 backtype.storm.daemon.executor$fn__5802$fn__5817.invoke(executor.clj:519) 在 backtype.storm.util$async_loop$fn__442.invoke(util.clj:434) 在 clojure.lang.AFn.run(AFn.java:24) 在 java.lang.Thread.run(Thread.java:744)

Code:
----
package com.**.trident.storm;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import storm.kafka.*;
import storm.trident.*;

import backtype.storm.*;

public class EventTridentDrpcTopology
{
private static final String KAFKA_SPOUT_ID = "kafkaSpout";  

private static final Logger log = LoggerFactory.getLogger(EventTridentDrpcTopology.class);

public static StormTopology buildTopology(OpaqueTridentKafkaSpout spout) throws Exception
{
    TridentTopology tridentTopology = new TridentTopology();
    TridentState ts = tridentTopology.newStream("event_spout",spout)
    .name(KAFKA_SPOUT_ID)
    .each(new Fields("mac_address"), new SplitMac(), new Fields("mac"))
    .groupBy(new Fields("mac"))
    .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("maccount"))
    .parallelismHint(4)
    ;

    tridentTopology
    .newDRPCStream("mac_count")
    .each(new Fields("args"), new SplitMac(), new Fields("mac"))
    .stateQuery(ts,new Fields("mac"),new MapGet(), new Fields("maccount"))
    .each(new Fields("maccount"), new FilterNull())
    .aggregate(new Fields("maccount"), new Sum(), new Fields("sum"))
     ;

return tridentTopology.build();

}

public static void main(String[] str) throws Exception
{
    Config conf = new Config();
    BrokerHosts hosts = new ZkHosts("xxxx:2181,xxxx:2181,xxxx:2181");
    String topic = "event";
    //String zkRoot = topologyConfig.getProperty("kafka.zkRoot");
    String consumerGroupId = "StormSpout";

    DRPCClient drpc = new DRPCClient("xxxx",3772);


    TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts, topic, consumerGroupId);
    tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new XScheme()); 
    OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig);


    StormSubmitter.submitTopology("event_trident", conf, buildTopology(opaqueTridentKafkaSpout));

}

}

最佳答案

您必须配置 DRPC 服务器的位置并启动它们。 请参阅 http://storm.apache.org/releases/0.10.0/Distributed-RPC.html 上的远程模式 DRPC

启动 DRPC 服务器 配置 DRPC 服务器的位置 提交 DRPC 拓扑到 Storm 集群 可以使用 storm 脚本启动 DRPC 服务器,就像启动 Nimbus 或 UI 一样:

bin/storm drpc

接下来,您需要配置 Storm 集群以了解 DRPC 服务器的位置。这就是 DRPCSpout 知道从哪里读取函数调用的方式。这可以通过 storm.yaml 文件或拓扑配置来完成。通过 storm.yaml 配置它看起来像这样:

drpc.servers: - “drpc1.foo.com” - “drpc2.foo.com”

关于hadoop - Storm 中的 DRPC 服务器错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27436606/

相关文章:

hadoop - 级联管道中 SQL NOT IN 的等价物是什么?

java - 在 java 中杀死 Storm 中的拓扑后的通知

hadoop - 在 HDFS 中使用 -addMount 时找不到类 org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem

java.lang.RuntimeException : java. lang.ClassCastException : [B cannot be cast to java. lang.String

java - 在 Bolt 构造函数中初始化的变量为 null

tuples - 如何使用storm Trident对元组进行批处理?

java - 在 Storm TrackedTopology 单元测试中运行 Trident 拓扑

apache-storm - Storm 与三叉戟 : When not to use Trident?

performance - 为什么使用通用mapreduce而不是 hive ?

java - slave VM 从 slaves 列表中删除,并且仍然被 Yarn/Tez 访问