我正在通过示例学习 Storm ExclamationTopology
.我想测量一个bolt 的延迟(将!!!
添加到一个单词中所需的时间)和吞吐量(例如,每秒有多少个单词通过一个bolt)。
来自 here ,我可以数出单词的数量和执行bolt的次数:
_countMetric = new CountMetric();
_wordCountMetric = new MultiCountMetric();
context.registerMetric("execute_count", _countMetric, 5);
context.registerMetric("word_count", _wordCountMetric, 60);
我知道 Storm UI 提供
Process Latency
和 Execute Latency
而这个 post很好地解释了它们是什么。但是,我想记录每个 bolt 每次执行的延迟,并将此信息与
word_count
一起使用。计算吞吐量。我如何使用 Storm Metrics完成这个?
最佳答案
虽然你的问题很直接,肯定会引起很多人的兴趣,但它的答案并不像它应该的那么简单。首先,我们需要澄清,我们真正想要衡量的是什么。吞吐量和延迟是术语,很容易理解,但在 Storms 分布式环境中事情变得更加复杂。
正如这篇优秀的 blog post 所描绘的那样, 每个 Storm 主管至少有 3 线程 完成不同的任务。当 Worker Receiver Thread 等待传入的数据元组并将它们聚合成一个块时,它们被发送到 Worker Executor Thread。这包含用户逻辑(在您的情况下是 ExclamationBolt
和处理传出消息的发送者。最后,在每个主管节点上,都有一个 工作线程发送线程 聚合来自所有执行者的消息,聚合它们并将它们发送到网络。
当然,这些线程中的每一个都有自己的延迟和吞吐量。对于 Sender 和 Receiver Thread,它们在很大程度上取决于缓冲区大小,您可以调整这些大小。在您的情况下,您只想测量一个(执行) bolt 的延迟和吞吐量 - 这是可能的,但请记住,那些其他线程会对这个 bolt 产生影响。
我的方法:
为了获得延迟和吞吐量,我使用了旧的 Storm Builtin Metrics .因为我发现文档不是很清楚,我在这里划了一条线:我们是不是 使用新 Storm Metric API v2我们是 不是 使用 Cluster Metrics .
storm.yaml
中来激活 Storm Logging :topology.metrics.consumer.register:
- class: "org.apache.storm.metric.LoggingMetricsConsumer"
parallelism.hint: 1
topology.builtin.metrics.bucket.size.secs: 10
LoggingMetricsConsumer
-Bolt 并将其分发到集群中。在这个节点上,你应该在 Storm 日志中找到相应的度量文件。complete-latency
, execute-latency
等等。对于吞吐量,我将使用包含以下内容的队列指标:arrival_rate_secs
作为每秒插入多少元组的估计。照顾在每个主管上执行的多个线程。祝你好运!
关于java - 如何测量 Storm 拓扑中的延迟和吞吐量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38838573/