我将 Apache Storm 1.0.3 与 Apache Kafka_2.11-0.10.1.0 集成。 Storm 正确地从 kafka 主题读取一两条消息,但是当第一个 Bolt 确认元组时,此确认不会显示在 Storm UI 中。有什么问题吗?
其他问题: 当 Storm 从 kafka 主题读取 10 条或 19 条消息时,此时 Storm UI 显示 Bolt 已确认 20 条消息,如果读取其他 19 条已确认消息组,则添加 20 条以上消息。我不明白为什么 Storm UI 显示 Spout 和 Bolts 的 acked 为 20 中的 20。 任何人都可以解释一下Storm Ui控制台中acked和fail的注册逻辑是什么?
我的拓扑的配置是:
final TopologyBuilder myTopology = new TopologyBuilder();
KafkaConfiguration kconfig = new KafkaConfiguration();
SpoutConfig spout = kconfig.getKafkaConfiguration( args[0], args[1], args[2], args[3]);
myTopology.setSpout("spoutMvClient", new KafkaSpout(spout), 5);
myTopology.setBolt("boltTransformToObject", new TransformBolt(),7).globalGrouping("spoutMvClient");
myTopology.setBolt("boltMVClient", new MvClientBolt(), 6).fieldsGrouping("boltTransformToObject",new Fields("objectTarget"));
Config conf = new Config();
conf.setMaxSpoutPending(5000);
try {
StormSubmitter.submitTopology( "topologyOne", conf, myTopology.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
}
我的第一个 TransformBolt Bolt 是:
public void execute(Tuple input) {
try {
LOG.info(input.getString(0));
Transform transform = new Transform();
OpenTarget openTarget = transform.getObjetGenericFromFileXml(input.getString(0));
collector.emit(input, new Values(openTarget));
collector.ack(input);
} catch (Exception e) {
LOG.error(e.getMessage());
collector.fail(input);
}
}
最佳答案
经过调查,我了解了 Storm 如何通过 Storm UI 确认他们的元组并显示。 有一个默认配置允许在 Storm UI 中显示已确认的数量:
此配置仅测量和显示 5% 的数据流。如果我们需要显示特定 spout 或 Bolt 确认或失败的元组数量,我们必须将此配置更改为:
config.setStatsSampleRate(1.0d);
就像:
Config conf = new Config();
conf.setMaxSpoutPending(5000);
conf.setStatsSampleRate(1.0d);
try {
StormSubmitter.submitTopology( "topologyOne", conf, myTopology.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
}
然后 Storm UI 开始计数并正确显示拓扑上发生的确认数量:
最后,这是另一个几乎像我的问题: Storm latency caused by ack
关于java - Storm UI 未显示正确确认,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42980253/