apache-storm - Storm 场分组示例

标签 apache-storm

我正在使用 Kafka storm,kafka 向 storm 发送/发出 json 字符串,在 storm 中,我想根据 json 中的键/字段将负载分配给几个工作人员。怎么做?在我的例子中,它是 json 字符串中的 groupid 字段。

比如我有这样的json:

{groupid: 1234, userid: 145, comments:"I want to distribute all this group 1234  to one worker", size:50,type:"group json"}
{groupid: 1235, userid: 134, comments:"I want to distribute all this group 1234 to another worker", size:90,type:"group json"}
{groupid: 1234, userid: 158, comments:"I want to be sent to same worker as group 1234", size:50,type:"group json"}   

=== Storm 0.9.4。被使用=====

我的源代码如下:

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


public class KafkaBoltMain {
   private static final String SPOUTNAME="TopicSpout"; 
   private static final String ANALYSISBOLT = "AnalysisWorker";
   private static final String CLIENTID = "Storm";
   private static final String TOPOLOGYNAME = "LocalTopology";


   private static class AppAnalysisBolt extends BaseRichBolt {
       private static final long serialVersionUID = -6885792881303198646L;
        private OutputCollector _collector;
       private long groupid=-1L;
       private String log="test";

       public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
           _collector = collector;
       }

       public void execute(Tuple tuple) {
           List<Object> objs = tuple.getValues();
           int i=0;
           for(Object obj:objs){
               System.out.println(""+i+"th object's value is:"+obj.toString());
               i++;
           }

//         _collector.emit(new Values(groupid,log));
           _collector.ack(tuple);
       }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("groupid","log"));
        }
   } 

   public static void main(String[] args){
       String zookeepers = null;
       String topicName = null;
       if(args.length == 2 ){
           zookeepers = args[0];
           topicName = args[1];
        }else if(args.length == 1 && args[0].equalsIgnoreCase("help")){ 
           System.out.println("xxxx");
           System.exit(0);
        }
       else{
           System.out.println("You need to have two arguments: kafka zookeeper:port and topic name");
           System.out.println("xxxx");
           System.exit(-1);
        }       

        SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeepers),
                topicName,
                "",// zookeeper root path for offset storing
                CLIENTID);
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(SPOUTNAME, kafkaSpout, 1);
        builder.setBolt(ANALYSISBOLT, new AppAnalysisBolt(),2)
            .fieldsGrouping(SPOUTNAME,new Fields("groupid"));

        //Configuration
        Config conf = new Config();
        conf.setDebug(false);
        //Topology run
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(TOPOLOGYNAME, conf, builder.createTopology());
    }
}

但是当我提交拓扑时,它给出了以下错误:

12794 [main] WARN  backtype.storm.daemon.nimbus - Topology submission exception. (topology name='LocalTopology') #<InvalidTopologyException InvalidTopologyException(msg:Component:
 [AnalysisWorker] subscribes from stream: [default] of component [TopicSpout] with non-existent fields: #{"groupid"})>
12800 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null

为什么会出现文件不存在的警告信息?有什么提示吗?

最佳答案

您需要从 json 对象中提取 json 属性,并将两个值(json 对象和字符串 groupId)作为一个双值元组传递。当您将流声明为拓扑规范逻辑的一部分时,您可以将第二个字段命名为“groupId”,并且一切正常。如果您不想修改 Kafka spout,则需要一个中介 bolt ,其唯一目的是将 groupId 从 json 对象中分离出来。中间 bolt 还可以使用定向流(emitDirect() 方法),将目标基于 json 对象中的 groupId。

这就是我不重用 Kafka spout 的原因之一——除了盲目地将数据写入流之外,我通常还想做其他事情,但这既不在这里也不在那里。

关于apache-storm - Storm 场分组示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29998310/

相关文章:

hadoop - 如何使用 Spark Streaming 确保事件的有序处理?

linux - apache storm 启动错误 normclasspath = cygpath if sys.platform == 'cygwin' else identity

java - Apache Storm : Track tuples by unique ID from Source Spout to Final Bolt

stream - 如何在 Trident 中映射具有持久状态的元组?

hadoop - Apache flume 和 Apache storm 有什么区别?

performance - Storm 处理数据极慢

hadoop - Datatorrent -Apex - 无法启动应用程序

java - Storm bolt 中的动态 Cassandra 连接

apache-storm - 在最多一次处理的情况下使用 BaseRichBolt 或 BaseBasicBolt

java - 为 Apache Storm 编写集成测试