我一直在研究 Storm 拓扑,但遇到了一些元组故障。我怀疑其中一个 bolt 在特定情况下没有确认,导致这些超时故障。 Apache Storm API (0.10.0) 中是否有一种方法可以识别哪个 Bolt 未按预期进行确认?
假设我们有 MySpout、BoltA 和 BoltB 作为此拓扑的组件,并且 MySpout 为两个 Bolt 发出元组,期望它们在处理元组后会进行确认。 executeTuple() 方法中的 BoltA 始终进行确认,但 BoltB 仅对其接收到的偶数值进行确认。所有具有奇数值的元组将在发出 10 分钟后失败。
在这个小样本中,很容易识别失败的流程。但在一个复杂的系统中,我们可以跟踪数十个具有多个 bolt 的不同流程,这就像大海捞针一样。有什么聪明的方法来发现这个失败吗?
public class MySpout extends BaseRichSpout {
protected SpoutOutputCollector collector;
//...
@Override
public void nextTuple() {
Integer msgId = new Integer((int)(Math.random() * 5000 + 1));
collector.emit(new Values(msgId), msgId);
}
@Override
public void fail(Object msgId) {
new Exception("Failed tuple. msgId="+msgId).printStackTrace();
}
}
public class BoltA extends BaseRichBolt {
private OutputCollector outputCollector;
//...
@Override
protected void executeTuple(Tuple input) {
Integer n = (Integer) input.getValues().get(0);
outputCollector.ack(input);
}
}
public class BoltB extends BaseRichBolt {
private OutputCollector outputCollector;
//...
@Override
protected void executeTuple(Tuple input) {
Integer n = (Integer) input.getValues().get(0);
if (n%2==0) {
outputCollector.ack(input);
}
}
}
为此 Storm 配置了 10 分钟的超时值。
<!-- storm config -->
<property>
<name>topology.enable.message.timeouts</name>
<value>true</value>
</property>
<property>
<!-- 10 mins -->
<name>topology.message.timeout.secs</name>
<value>600</value>
</property>
这是当元组失败时我们看到的堆栈跟踪。我试图提取元组失败流程的一些信息。它没有说任何对我有帮助的事情。
java.lang.Exception: Failed tuple. msgId=1234
at MySpout.fail(MySpout.java:127) [myJar.jar:?]
at backtype.storm.daemon.executor$fail_spout_msg.invoke(executor.clj:401) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.daemon.executor$fn$reify__4467.expire(executor.clj:461) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.utils.RotatingMap.rotate(RotatingMap.java:73) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.daemon.executor$fn__4464$tuple_action_fn__4470.invoke(executor.clj:466) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.daemon.executor$mk_task_receiver$fn__4455.invoke(executor.clj:433) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.disruptor$clojure_handler$reify__4029.onEvent(disruptor.clj:58) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.disruptor$consume_batch.invoke(disruptor.clj:76) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.daemon.executor$fn__4464$fn__4479$fn__4510.invoke(executor.clj:578) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.util$async_loop$fn__543.invoke(util.clj:475) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45]
最佳答案
spout 不知道哪个 Bolt 导致元组失败,这就是为什么您在堆栈跟踪中看不到任何内容。
我将使用1.2.2代码来响应。您很可能可以在 0.10.0 中执行相同的操作,但该版本很古老,我不想深入研究它。
调试此问题的一种方法是在提交拓扑时启用 Storm 的调试日志记录。
Config config = new Config();
config.setDebug(true);
//submit your topology using this Config
当元组失败时,您将得到类似的日志
2019-06-29 12:16:09.552 o.a.s.d.executor Thread-11-word-executor[16 16] [INFO] SPOUT Failing 32496024444700129: {:stream "default", :values [84 1561716922356 116]} REASON: TIMEOUT MSG-ID: 116
然后您可以通过消除找出哪个 Bolt 没有确认元组。如果您在日志中搜索元组 ID(此处为 32496024444700129
值),您将看到每次传输到该元组的 Bolt 的日志,例如
2019-06-29 12:15:22.356 o.a.s.d.executor Thread-11-word-executor[16 16] [INFO] TRANSFERING tuple [dest: 4 tuple: source: word:16, stream: default, id: {32496024444700129=5923978744049352856}, [84, 1561716922356, 116]]
这告诉我元组已转移到任务 4。当拓扑启动时,它记录了哪个 Bolt 是任务 4,您也可以在 Storm UI 中看到这一点。
2019-06-29 12:15:08.801 o.a.s.d.executor main [INFO] Loading executor exclaim1:[4 4]
我可以看到任务 4 确认了元组,所以这不是元组超时的原因
2019-06-29 12:15:22.359 o.a.s.d.task Thread-17-exclaim1-executor[4 4] [INFO] Emitting: exclaim1 __ack_ack [32496024444700129 7387867738466240036]
我还可以看到任务 4 将元组传输到任务 8
2019-06-29 12:15:22.359 o.a.s.d.executor Thread-17-exclaim1-executor[4 4] [INFO] TRANSFERING tuple [dest: 8 tuple: source: exclaim1:4, stream: default, id: {32496024444700129=3796756412183316156}, [84!!!]]
任务 8 处理了它
2019-06-29 12:15:22.363 o.a.s.d.executor Thread-5-exclaim2-executor[8 8] [INFO] Processing received message FOR 8 TUPLE: source: exclaim1:4, stream: default, id: {32496024444700129=3796756412183316156}, [84!!!]
没有任务 8 确认该元组的日志,因此任务 8 是未正确确认的 Bolt。
我在日志中查找任务 8 并得到
2019-06-29 12:15:08.446 o.a.s.d.executor main [INFO] Loaded executor tasks exclaim2:[8 8]
所以“exclaim2”是我的拓扑中未正确确认的 bolt 。
关于java - 如何找出 Storm 拓扑中哪个 bolt 没有确认?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56811848/