java - 在本地模式下运行多个storm拓扑

标签 java apache-storm

我的 main() 方法中有两个独立的拓扑。我使用“for 循环”来设置其相应的喷嘴和 bolt ,并提交该拓扑。 每个拓扑都有单独的一组 spout 和 Bolt,其中具有不同的逻辑。

现在我想动态调用不同的喷嘴和 bolt 。我该怎么做??

主方法中的代码:

    public static void main(String[] args) throws InterruptedException,Exception            
    {
    LocalCluster cluster = new LocalCluster();



    try{
        BufferedReader in=new BufferedReader(new FileReader("/home/praveen /workspace/OfferEngine/OfferListJSON.json"));
        //ArrayList<String> content = new ArrayList<String>();
        String str="";
        String str1="";
        while((str1=in.readLine())!=null)
        {
            str = str + str1;
        }

         JsonParser parser = new JsonParser();
         JsonObject o = (JsonObject)parser.parse(str);
         JsonObject o2 = (JsonObject)o.get("OfferData");

         ArrayList<String> ol = new ArrayList<String>();
         ol.add(o2.get("strOfferId").toString().replaceAll("^\"|\"$", ""));
         ol.add(o2.get("strOfferId1").toString().replaceAll("^\"|\"$", ""));

         TopologyBuilder bu = null;
         Config config = new Config();
         config.setDebug(false);

         for(String x : ol) {
                System.out.println(x); //prints element x
                String y="", z="";
                bu = new TopologyBuilder();         
                bu.setSpout(x, new Off1Spout(x).ks(), 2);
                y = x+"2";
                bu.setBolt(y, new main.java.bolts.Off1Bolt()).shuffleGrouping(x);
                z = x+"offerlimit";
                bu.setBolt(z, new OfferLimit()).shuffleGrouping(y);
cluster.submitTopology(x,config,bu.createTopology());
              }
    }

    catch (IOException e) 
    {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

任何形式的建议都更有帮助......谢谢问候

最佳答案

您可以在 for 循环中尝试 if() 条件以获得不同的优惠...请参阅以下代码供您引用

for(String x : ol) {
     System.out.println(x); 
//  Offer 1 related Topology
    if(x.equalsIgnoreCase("offer1")){                   
       String y="", z="";
       bu = new TopologyBuilder();          
       bu.setSpout();
       y = x+"start";
       cluster.submitTopology(x,config,bu.createTopology());
    }

//      Offer 2 related Topology

 if(x.equalsIgnoreCase("off2")){

       String y="", z="";
       bu = new TopologyBuilder();          
       bu.setSpout(x, new Off1Spout(x).ks(), 2);
       y = x+"start";
       bu.setBolt(y, new off2Bolt()).shuffleGrouping(x);
       cluster.submitTopology(x,config,bu.createTopology());
  }
}

有多少拓扑会创建那么多 if 条件..

关于java - 在本地模式下运行多个storm拓扑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19090610/

相关文章:

java - NoSuchMethodError : org. slf4j

mongodb - Apache Storm和 bolt 将数据存储在管道中

java - J2ME - 无法关闭客户端的输出流,服务器挂起等待数据 [nokia J2ME 实现]

java - 如何拉伸(stretch)ImageView中的图像?

java - BigQuery - 如何在 Java 客户端库中设置读取超时

java - 在Java小程序中调用无限循环

java - Qt C++ 如何通过android原生播放器播放视频

apache-storm - Storm - DRPC 与事务型与 Trident - 何时使用什么?

elasticsearch - 使用Kafka,Storm和Elasticsearch构建数据管道

apache-storm - Apache Kafka 与 Apache Storm