java - 在 Apache Storm bolt 中使用 Apache Camel ProducerTemplate

标签 java apache-camel apache-storm

我正在尝试编写简单的 Storm + Camel 项目。 我的 Storm 拓扑分析推文,一个 bolt 应该将推文文本发送到 apache camel 路由,而 apache camel 路由又使用 websocket 通知一些 webapp。

由于在尝试使用 build once CamelContext 时从 bolt 收到 NotSerializableExceptions,我无法使其工作。

我已经尝试过的:

  • 在 bolt 的构造函数中传递 CamelContext - 导致 NotSerializableException
  • 在 storm conf 中传递 CamelContext,并在 bolt 的 prepare(...) 方法中使用它来获取它。结果:

    14484 [main] 错误 org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] 已死 java.lang.IllegalArgumentException:拓扑 conf 不是 json 可序列化的 在 backtype.storm.testing$submit_local_topology.invoke(testing.clj:262) ~[storm-core-0.9.4.jar:0.9.4] 在 backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:43) ~[storm-core-0.9.4.jar:0.9.4] 在 backtype.storm.LocalCluster.submitTopology(未知来源)~[storm-core-0.9.4.jar:0.9.4]

Camel 路线:

public class MyRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("direct:main")
                .to("websocket:localhost:8085/main?sendToAll=true");
    }
}

Storm 拓扑: Tweet Spout 正在使用 twitter4j stremaing API 传播推文。

public class TwitterStreamTopology {

    public static void main(String[] args) {
        CamelContext producerTemplate = new RouteStarter().buildRoute();

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("tweetSpout", new TweetSpout(keywords), 1);
        builder.setBolt("websocket", new WebSocketBolt()).shuffleGrouping("tweetSpout");
        Config conf = new Config();
        conf.put("producerTemplate", producerTemplate.createProducerTemplate());
        conf.setDebug(true);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("mytopology", conf, builder.createTopology());

        Utils.sleep(20000);
        cluster.shutdown();
    }
}

网络套接字 bolt :

public class WebSocketBolt extends BaseBasicBolt {
    private ProducerTemplate producerTemplate;

    @Override
    public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
        Status s = (Status) input.getValueByField("tweet");
        producerTemplate.sendBody("direct:main", s.getText());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
        this.producerTemplate = (ProducerTemplate) stormConf.get("producerTemplate");
    }
}

有没有办法很好地做到这一点?

或者我应该让 camel 路由被 http 访问,并在 bolt prepare(...) 方法中创建一些 HttpClient 吗?这看起来仍然有点矫枉过正,必须有一种方法让它变得更容易。

感谢大家的帮助!

最佳答案

您的问题的根本原因是您将 ProducerTemplate 添加到您的 Storm 配置中并且它抛出异常,因为它不可序列化。如果那是您自己的类(class),您可以更改代码以使其正常工作,但由于这是一个 Camel 类(class),我会推荐一种不同的方法。

  1. WebSocketBolt:将您的 producerTemplate private 成员更改为 transient:private transient ProducerTemplate producerTemplate; 这样它就不会尝试被序列化(与将它放入 conf 时遇到的问题相同)。
  2. WebSocketBolt:在您的准备方法中而不是在您的拓扑中初始化 producerTemplate。

像这样:

public class WebSocketBolt extends BaseBasicBolt {
    private transient ProducerTemplate producerTemplate;

    @Override
    public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
        Status s = (Status) input.getValueByField("tweet");
        producerTemplate.sendBody("direct:main", s.getText());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
        CamelContext producerTemplate = new RouteStarter().buildRoute();
        this.producerTemplate = producerTemplate.createProducerTemplate();
    }
}

关于java - 在 Apache Storm bolt 中使用 Apache Camel ProducerTemplate,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29913304/

相关文章:

java - 403 禁止将 wss4jOutInterceptor 与 Camel 和 cxf 一起使用

java - Camel ;简单的 seda 到 bean 路线不起作用

c++ - 如何在 Storm 和 Thrift 上使用 C++ spout/bolt 在 Storm 中使用

java - Android 音乐播放器在添加歌曲后不重新索引

java.rmi.UnmarshalException : unable to pull client classes by server

apache-camel - 我可以在同一个 Camel 上下文中的 Camel Routes 之间共享本地数据吗?

elasticsearch - 使用 elasticsearch-hadoop 库将元组从 storm 索引到 elasticsearch 不起作用

java - SimpleThreadPool 的 Quartz ClassNotFoundException

java - 如何通过索引获取 TreeMap 的键?

hadoop - Storm 输入和输出可能的来源