您好,与 Apache Storm 合作。我有多个 kafka 主题,我想使用单个 Bolt 解析所有消息(使用并行性来处理负载)。
我想问一下可以吗?以下是我正在尝试的内容
Collection<SpoutSpec<? extends BaseRichBolt>> spouts; // I take this as a method argument
TopologyBuilder topology = new TopologyBuilder();
spouts.forEach(spec -> {
topology.setSpout(spec.getName() + "Spout", new KafkaSpout(spec.getSpoutConfig()), spec.getParallelism());
topology.setBolt("FileBeat-Bolt", new FileBeatMessageBolt(), spec.getParallelism()).shuffleGrouping(spec.getName() + "Spout");
topology.setBolt("Message-Handling-Bolt", new MessageHandlingBolt(), spec.getParallelism()).shuffleGrouping("FileBeat-Bolt");
topology.setBolt("Output-Kafka-Bolt", new ProcessedOutputHandler(), spec.getParallelism()).shuffleGrouping("Message-Handling-Bolt");
});
我的 SpoutSpec 类
public class SpoutSpec<T extends BaseRichBolt> {
private final String name;
private final int parallelism;
private final SpoutConfig spoutConfig;
private final T handler;
}
但是消息不会从 FileBeat-Bolt
发送到其他 Bolt。以下是我发出数据的方式:
JsonNode jsonNode = objectMapper.readValue(input.getString(0), JsonNode.class);
String topic = jsonNode.get("@metadata").get("topic").getTextValue();
String message = jsonNode.get("message").getTextValue();
collector.emit("Message-Handling-Bolt", input, new Values(topic, message));
最佳答案
您的 emit
调用是错误的。第一个参数不是 bolt 名称,而是流名称。当您想要将消息从一个 Bolt 划分为多个数据流时,可以使用流名称。就您而言,您不想拆分流。
collector.emit("Message-Handling-Bolt", input, new Values(topic, message));
将发送到一个名为“Message-Handling-Bolt”的流,并且您在该流上没有任何监听。您的“Message-Handling-Bolt”正在监听默认流。将第一个参数删除为 emit
,或者将您的 Bolt 声明更改为:
topology.setBolt("Message-Handling-Bolt", new MessageHandlingBolt(), spec.getParallelism()).shuffleGrouping("FileBeat-Bolt", "Message-Handling-Bolt");
编辑:回复您的评论: 对您来说最简单的解决方案是简单地删除发出调用中的第一个参数:
collector.emit(input, new Values(topic, message));
如果由于某种原因您不想这样做,并且想要显式命名流,则需要声明您的 FileBeatMessageBolt
将发送到 Message-Handling-Bolt
流。您可以将其作为 declareOutputFields
实现的一部分来执行:
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("Message-Handling-Bolt", new Fields(...));
}
关于java - 将数据从一个 Bolt 发送到另一个 Apache Storm,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59559676/