java - nextTuple() 在 Storm 上使用 BaseRichSpout 被无限次调用

标签 java bigdata apache-storm

我实现了简单的 Storm 拓扑结构,它有一个 spout 和一个在本地集群模式下运行的 bolt。

出于某种原因,spout 的 nextTuple() 被调用了不止一次。

知道为什么吗?

代码:

喷口:

public class CommitFeedListener extends BaseRichSpout {
    private SpoutOutputCollector outputCollector;
    private List<String> commits;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("commit"));
    }

    @Override
    public void open(Map configMap,
                     TopologyContext context,
                     SpoutOutputCollector outputCollector) {
        this.outputCollector = outputCollector;
    }

    **//that method is invoked more than once**
    @Override
    public void nextTuple() {

            outputCollector.emit(new Values("testValue"));

    }
}

bolt :

public class EmailExtractor extends BaseBasicBolt {
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("email"));
    }
    @Override
    public void execute(Tuple tuple,
                        BasicOutputCollector outputCollector) {
        String commit = tuple.getStringByField("commit");
        System.out.println(commit);        
    }  
}

运行配置:

public class LocalTopologyRunner {
    private static final int TEN_MINUTES = 600000;
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("commit-feed-listener", new CommitFeedListener());
                builder
        .setBolt("email-extractor", new EmailExtractor())
                .shuffleGrouping("commit-feed-listener");
        Config config = new Config();
        config.setDebug(true);
        StormTopology topology = builder.createTopology();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("github-commit-count-topology",
                config,
                topology);
        Utils.sleep(TEN_MINUTES);
        cluster.killTopology("github-commit-count");
        cluster.shutdown();
    }
}

谢谢大家, 射线。

最佳答案

nextTuple() 按照设计在无限循环中调用。这样做是为了使用例如针对外部资源(数据库、流、IO 等)的脏检查。

如果你在 nextTuple() 中无事可做,你应该睡一会儿以防止 CPU 使用 backtype.storm.utils.Utils 发送垃圾邮件

Utils.sleep(pollIntervalInMilliseconds);

Storm 是一种实时处理架构,因此它确实是正确的行为。检查一些示例以了解如何根据您的需要实现 spout。

关于java - nextTuple() 在 Storm 上使用 BaseRichSpout 被无限次调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27354438/

相关文章:

hadoop - 通过Informatica Big Data Edition创建配置单元表

来自大数据的 Clojure 频率字典

java - 错误 backtype.storm.daemon.executor - java.lang.NoClassDefFoundError : org/I0Itec/zkclient/serialize/ZkSerializer

java - 检查链表是否是回文-我在这里缺少什么?

java - 嵌入式 ActiveMQ 代理需要哪些依赖项?

java - HTTP POST 将 @ 转换为 %40

java - 这个迭代在java中如何完成?

python - 在无法容纳内存的大文件中查找字符串的出现

java - 流关闭错误 - Storm

apache-storm - 了解Storm消息的处理流程