java - 如何找出 Storm 拓扑中哪个 bolt 没有确认?

标签 java apache-storm

我一直在研究 Storm 拓扑,但遇到了一些元组故障。我怀疑其中一个 bolt 在特定情况下没有确认,导致这些超时故障。 Apache Storm API (0.10.0) 中是否有一种方法可以识别哪个 Bolt 未按预期进行确认?

假设我们有 MySpout、BoltA 和 BoltB 作为此拓扑的组件,并且 MySpout 为两个 Bolt 发出元组,期望它们在处理元组后会进行确认。 executeTuple() 方法中的 BoltA 始终进行确认,但 BoltB 仅对其接收到的偶数值进行确认。所有具有奇数值的元组将在发出 10 分钟后失败。

在这个小样本中,很容易识别失败的流程。但在一个复杂的系统中,我们可以跟踪数十个具有多个 bolt 的不同流程,这就像大海捞针一样。有什么聪明的方法来发现这个失败吗?

public class MySpout extends BaseRichSpout {
    protected SpoutOutputCollector collector;
    //...
    @Override
    public void nextTuple() {
        Integer msgId = new Integer((int)(Math.random() * 5000 + 1));
        collector.emit(new Values(msgId), msgId);
    }
    @Override
    public void fail(Object msgId) {
        new Exception("Failed tuple. msgId="+msgId).printStackTrace();
    }
}

public class BoltA extends BaseRichBolt {
    private OutputCollector outputCollector;
    //...
    @Override
    protected void executeTuple(Tuple input) {
        Integer n = (Integer) input.getValues().get(0);
        outputCollector.ack(input);
    }
}

public class BoltB extends BaseRichBolt {
    private OutputCollector outputCollector;
    //...
    @Override
    protected void executeTuple(Tuple input) {
        Integer n = (Integer) input.getValues().get(0);
        if (n%2==0) {
            outputCollector.ack(input);
        }
    }
}

为此 Storm 配置了 10 分钟的超时值。

<!-- storm config -->
<property>
    <name>topology.enable.message.timeouts</name>
    <value>true</value>
</property>
<property>
    <!-- 10 mins -->
    <name>topology.message.timeout.secs</name>
    <value>600</value>
</property>

这是当元组失败时我们看到的堆栈跟踪。我试图提取元组失败流程的一些信息。它没有说任何对我有帮助的事情。

java.lang.Exception: Failed tuple. msgId=1234
at MySpout.fail(MySpout.java:127) [myJar.jar:?]
at backtype.storm.daemon.executor$fail_spout_msg.invoke(executor.clj:401) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.daemon.executor$fn$reify__4467.expire(executor.clj:461) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.utils.RotatingMap.rotate(RotatingMap.java:73) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.daemon.executor$fn__4464$tuple_action_fn__4470.invoke(executor.clj:466) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.daemon.executor$mk_task_receiver$fn__4455.invoke(executor.clj:433) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.disruptor$clojure_handler$reify__4029.onEvent(disruptor.clj:58) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.disruptor$consume_batch.invoke(disruptor.clj:76) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.daemon.executor$fn__4464$fn__4479$fn__4510.invoke(executor.clj:578) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at backtype.storm.util$async_loop$fn__543.invoke(util.clj:475) [storm-core-0.10.0-beta1.jar:0.10.0-beta1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45]

最佳答案

spout 不知道哪个 Bolt 导致元组失败,这就是为什么您在堆栈跟踪中看不到任何内容。

我将使用1.2.2代码来响应。您很可能可以在 0.10.0 中执行相同的操作,但该版本很古老,我不想深入研究它。

调试此问题的一种方法是在提交拓扑时启用 Storm 的调试日志记录。

Config config = new Config();
config.setDebug(true);
//submit your topology using this Config

当元组失败时,您将得到类似的日志

2019-06-29 12:16:09.552 o.a.s.d.executor Thread-11-word-executor[16 16] [INFO] SPOUT Failing 32496024444700129: {:stream "default", :values [84 1561716922356 116]} REASON: TIMEOUT MSG-ID: 116

然后您可以通过消除找出哪个 Bolt 没有确认元组。如果您在日志中搜索元组 ID(此处为 32496024444700129 值),您将看到每次传输到该元组的 Bolt 的日志,例如

2019-06-29 12:15:22.356 o.a.s.d.executor Thread-11-word-executor[16 16] [INFO] TRANSFERING tuple [dest: 4 tuple: source: word:16, stream: default, id: {32496024444700129=5923978744049352856}, [84, 1561716922356, 116]]

这告诉我元组已转移到任务 4。当拓扑启动时,它记录了哪个 Bolt 是任务 4,您也可以在 Storm UI 中看到这一点。

2019-06-29 12:15:08.801 o.a.s.d.executor main [INFO] Loading executor exclaim1:[4 4]

我可以看到任务 4 确认了元组,所以这不是元组超时的原因

2019-06-29 12:15:22.359 o.a.s.d.task Thread-17-exclaim1-executor[4 4] [INFO] Emitting: exclaim1 __ack_ack [32496024444700129 7387867738466240036]

我还可以看到任务 4 将元组传输到任务 8

2019-06-29 12:15:22.359 o.a.s.d.executor Thread-17-exclaim1-executor[4 4] [INFO] TRANSFERING tuple [dest: 8 tuple: source: exclaim1:4, stream: default, id: {32496024444700129=3796756412183316156}, [84!!!]]

任务 8 处理了它

2019-06-29 12:15:22.363 o.a.s.d.executor Thread-5-exclaim2-executor[8 8] [INFO] Processing received message FOR 8 TUPLE: source: exclaim1:4, stream: default, id: {32496024444700129=3796756412183316156}, [84!!!]

没有任务 8 确认该元组的日志,因此任务 8 是未正确确认的 Bolt。

我在日志中查找任务 8 并得到

2019-06-29 12:15:08.446 o.a.s.d.executor main [INFO] Loaded executor tasks exclaim2:[8 8]

所以“exclaim2”是我的拓扑中未正确确认的 bolt 。

关于java - 如何找出 Storm 拓扑中哪个 bolt 没有确认?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56811848/

相关文章:

java - ack 方法仅适用于第一个 bolt ,不适用于其他 bolt

apache-spark - 是否建议使用 Kafka 作为事实来源?

java - 如何指定在终端使用哪个版本的 Java?

java - 在base64的javamail中附加大文件

apache-kafka - Storm UI 不正确的值和毛细管工具

java - 从 Spout 发出自定义 Java 对象作为元组

java - Storm 中的延迟队列/消息处理

java - RxJava : How to get all results AND errors from an Observable

java - 浏览文件夹中的文件夹

java - 为什么我的代码没有与手机电池同步?