java - 将数据从一个 Bolt 发送到另一个 Apache Storm

标签 java apache-storm

您好,与 Apache Storm 合作。我有多个 kafka 主题,我想使用单个 Bolt 解析所有消息(使用并行性来处理负载)。

我想问一下可以吗?以下是我正在尝试的内容

Collection<SpoutSpec<? extends BaseRichBolt>> spouts; // I take this as a method argument

TopologyBuilder topology = new TopologyBuilder();

    spouts.forEach(spec -> {
        topology.setSpout(spec.getName() + "Spout", new KafkaSpout(spec.getSpoutConfig()), spec.getParallelism());
        topology.setBolt("FileBeat-Bolt", new FileBeatMessageBolt(), spec.getParallelism()).shuffleGrouping(spec.getName() + "Spout");
        topology.setBolt("Message-Handling-Bolt", new MessageHandlingBolt(), spec.getParallelism()).shuffleGrouping("FileBeat-Bolt");
        topology.setBolt("Output-Kafka-Bolt", new ProcessedOutputHandler(), spec.getParallelism()).shuffleGrouping("Message-Handling-Bolt");
    });

我的 SpoutSpec 类

public class SpoutSpec<T extends BaseRichBolt> {

    private final String name;

    private final int parallelism;

    private final SpoutConfig spoutConfig;

    private final T handler;

}

但是消息不会从 FileBeat-Bolt 发送到其他 Bolt。以下是我发出数据的方式:

JsonNode jsonNode = objectMapper.readValue(input.getString(0), JsonNode.class);

String topic = jsonNode.get("@metadata").get("topic").getTextValue();

String message = jsonNode.get("message").getTextValue();

collector.emit("Message-Handling-Bolt", input, new Values(topic, message));

最佳答案

您的 emit 调用是错误的。第一个参数不是 bolt 名称,而是流名称。当您想要将消息从一个 Bolt 划分为多个数据流时,可以使用流名称。就您而言,您不想拆分流。

collector.emit("Message-Handling-Bolt", input, new Values(topic, message));

将发送到一个名为“Message-Handling-Bolt”的流,并且您在该流上没有任何监听。您的“Message-Handling-Bolt”正在监听默认流。将第一个参数删除为 emit,或者将您的 Bolt 声明更改为:

topology.setBolt("Message-Handling-Bolt", new MessageHandlingBolt(), spec.getParallelism()).shuffleGrouping("FileBeat-Bolt", "Message-Handling-Bolt");

编辑:回复您的评论: 对您来说最简单的解决方案是简单地删除发出调用中的第一个参数:

collector.emit(input, new Values(topic, message));

如果由于某种原因您不想这样做,并且想要显式命名流,则需要声明您的 FileBeatMessageBolt 将发送到 Message-Handling-Bolt 流。您可以将其作为 declareOutputFields 实现的一部分来执行:

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declareStream("Message-Handling-Bolt", new Fields(...));
}

关于java - 将数据从一个 Bolt 发送到另一个 Apache Storm,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59559676/

相关文章:

java - Apache Storm - 无法找到或加载主类 org.apache.storm.starter.ExclamationTopology

java - 如何在抽象类中访问 TopologyContext

java - 通过返回值更改字段与通过在方法中引用字段来更改字段?

java - 为什么 spring 不使用通用限定符注入(inject)?

java - OpenJDK 11 问题 - 客户端在最后一次 UNWRAP 之前完成握手

docker - Storm :java. lang.RuntimeException: Returned channel 实际上没有建立

bigdata - Storm 的 HdfsBolt 也可以在超时后刷新数据吗?

java - 在Java中将数据写入excel第一个选项卡数据在写入第二个选项卡时不显示

java - Java SSLSocket 和 HTML5 之间的通信

java - 创建一个 backtype.storm.tuple.Tuple 用于测试目的?