java - 如何设置 TOPOLOGY_MAX_SPOUT_PENDING 参数

标签 java apache-kafka apache-storm

在我的拓扑中,我从 Kafka 队列中读取触发消息。收到触发消息后,我需要向 bolt 发送大约 4096 条消息。在 bolt 中,经过一些处理后,它将发布到另一个 Kafka 队列(稍后另一个拓扑将使用它)。

我正在尝试设置 TOPOLOGY_MAX_SPOUT_PENDING 参数来限制发送的消息数量。但我看到它没有效果。是因为我在一个 nextTuple() 方法中发出所有元组吗?如果是这样,应该如何解决?

最佳答案

如果您从 kafka 读取数据,您应该使用 storm 自带的 KafkaSpout。不要尝试实现自己的 spout,相信我,我在生产中使用 KafkaSpout 并且它工作得非常顺利。每条 Kafka 消息只生成一个元组。

正如您在 this nice page from the manual 上看到的那样,您可以像这样设置 topology.max.spout.pending:

Config conf = new Config();
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("mytopology", conf, topology);

topology.max.spout.pending 是为每个 spout 设置的,如果你有四个 spout,你的拓扑中的不完整元组的最大值等于 spout 的数量 * 拓扑。最大 spout.pending.

另一个提示,您应该使用 Storm UI 查看 topology.max.spout.pending 是否设置正确。


记住 topology.max.spout.pending 只是拓扑内部未处理的元组数量,拓扑永远不会停止消费来自 kafka 的消息,至少在生产系统上是这样......如果你想消费 4096 的批处理,你需要在你的 bolt 上实现缓存逻辑,或者使用 storm 以外的东西(面向微批处理的东西)。

关于java - 如何设置 TOPOLOGY_MAX_SPOUT_PENDING 参数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32812448/

相关文章:

java - BitmapFont 按边界获取字符串?

java - "Bean name must not be empty"具有 bean 名称的 spring bean 错误

java - Maven Shade 包含 slf4j-log4j12,尽管已明确从 POM 中排除

apache-spark - 流分析架构。我需要哪个经纪人?

java - 在TestNG的@BeforeMethod上声明Jmockit模拟参数

java - ReSTLet 2.0.11 增加线程数

elasticsearch - Kafka连接器Elasticsearch topic.regex

apache-kafka - 匹配Kafka消费者和生产者分区

database - 数据先放在Kafka还是Database?

java - ProcessSimulator.killAllProcesses(line:78)NoSuchMethodError ConcurrentHashMap.keySet()