java - Storm 集群重复元组

标签 java apache-storm esper

目前,我正在开展一个项目,在该项目中,我在四台 Unix 主机上设置了一个 Storm 集群。

拓扑本身如下:

  1. JMS Spout 监听 MQ 以获取新消息
  2. JMS Spout 解析然后将结果发送给 Esper Bolt
  3. Esper Bolt 然后处理事件并将结果发送到 JMS Bolt
  4. 然后 JMS Bolt 将关于不同主题的消息发布回 MQ

我意识到 Storm 是一个“至少一次”的框架。但是,如果我收到 5 个事件并将这些事件传递给 Esper Bolt 进行计数,那么由于某种原因我在 JMS Bolt 中收到 5 个计数结果(所有值相同)。

理想情况下,我想接收一个结果输出,有什么方法可以让 Storm 忽略重复的元组吗?

我认为这与我设置的并行性有关,因为当我只有一个线程时它会按预期工作:

 TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout(JMS_DATA_SPOUT, new JMSDataSpout(),2).setNumTasks(2);
    builder.setBolt("esperBolt", new EsperBolt.Builder().build(),6).setNumTasks(6)
            .fieldsGrouping(JMS_DATA_SPOUT,new Fields("eventGrouping"));
    builder.setBolt("jmsBolt", new JMSBolt(),2).setNumTasks(2).fieldsGrouping("esperBolt", new Fields("eventName"));

我还看到了 Trident 的“exactly-once”语义。不过,我并不完全相信这会解决这个问题。

最佳答案

如果您的 Esper Bolt 没有在其 execute() 方法末尾显式确认()每个元组或使用 iBasicBolt 实现,那么它接收到的每个元组最终将在超时后由您的源 JMS Spout 重放。

或者,如果您要求您的 bolt “仅处理唯一消息”,请考虑将此处理行为添加到您的 execute() 方法中。它可以首先检查本地 Guava 缓存中的元组值的唯一性,然后进行相应的处理。

关于java - Storm 集群重复元组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23764293/

相关文章:

java - 如何通过内部类itemListener修改类变量?

hadoop - 使用 MapReduce 处理 UDP 数据流

java - 从 Esper 查询获取最大值、最小值、第一个柱的最高值和最后一个柱的最低值

wso2 - CEP 如何检测和报告流中的重复项?

java - 使用 Esper,如何对事件进行动态过滤?

java - Scala 隐式转换未应用于 Java 参数模式

java - REST API 授权类型

java - 在 If else 分支中,首选方法是什么?

java - Storm的ExecutorDetails类

java - Apache Storm : ClassNotFoundException when deploying jar to remoteCluster