java - Camel : How to join back to a single path after multicast?

标签 java apache-camel

这似乎是一个非常简单的问题,但我已经尝试了所有我能想到的方法。基本上,我有一个计时器路由,可以将其消息发送到一堆不同的 bean。这些 bean 在交换器上设置了一个属性(我也尝试过在消息上设置一个 header ),我希望将所有这些 bean 的交换输出定向到一个过滤器(检查属性或 header ),然后可选地另一个端点。像这样:

                       ---> Bean A ---
                      /               \
timer --> multicast ------> Bean B ------> end --> filter --> endpoint
                      \               /
                       ---> Bean C ---

目前路由看起来像这样,它适用于向 bean 进行多播:

from("timer://my-timer?fixedRate=true&period=20000&delay=0")
  .multicast()
    .to("bean:beanA", "bean:beanB", "bean:beanC");

以下是我尝试过的一些解决方案:

解决方案一

from("timer://my-timer?fixedRate=true&period=20000&delay=0")
  .multicast()
    .to("bean:beanA", "bean:beanB", "bean:beanC")
  .filter(new myPredicate())
    .to("myOptionalEndpoint");

这使过滤器与 bean 并行,而不是在它们之后。

方案二

from("timer://my-timer?fixedRate=true&period=20000&delay=0")
  .multicast()
    .to("bean:beanA", "bean:beanB", "bean:beanC")
  .end()
  .filter(new myPredicate())
    .to("myOptionalEndpoint");

并行执行 bean,然后执行过滤器。但是,未设置属性/ header 。似乎交换是刚结束的计时器,而不是经历过 bean 类的交换......

编辑: 我尝试设置正文,但实际上到达过滤器的消息没有正文。我无法想象 Camel 会以某种方式去除消息的有效负载,因此我必须假设此交换是来自计时器的新交换,而不是经过 bean 的交换。但是,它发生在 bean 完成后

方案三

from("timer://my-timer?fixedRate=true&period=20000&delay=0")
  .multicast()
    .beanRef("beanA").to("direct:temp")
    .beanRef("beanB").to("direct:temp")
    .beanRef("beanC").to("direct:temp")
  .end()

from("direct:temp")
  .filter(new myPredicate())
    .to("myOptionalEndpoint");

消息按预期到达过滤器,但我设置的属性/ header 消失了,因此没有消息通过过滤器。

编辑:尸体也不见了,很明显我没有得到来自 bean 的相同交换...

澄清一下,我正在寻找一种解决方案,其中将来自计时器的单个交换多播到每个 bean(因此现在我们有 3 个交换),然后将这 3 个中的每一个发送到过滤器。

谁能帮我弄清楚如何构建这条路线?

最佳答案

您需要使用聚合策略才能将所有结果聚合为一个。

下面是来自 http://javarticles.com/2015/05/apache-camel-multicast-examples.html 的一个很好的例子(请参阅具有自定义聚合策略的多播部分)

public class CamelMulticastAggregationExample {
  public static final void main(String[] args) throws Exception {
    JndiContext jndiContext = new JndiContext();
    jndiContext.bind("myBean", new MyBean());
    CamelContext camelContext = new DefaultCamelContext(jndiContext);
    try {
        camelContext.addRoutes(new RouteBuilder() {
            public void configure() {
                from("direct:start")
                .multicast()
                .aggregationStrategy(new JoinReplyAggregationStrategy())
                .to("direct:a", "direct:b", "direct:c")
                .end()
                .to("stream:out");

                from("direct:a")
                .to("bean:myBean?method=addFirst");

                from("direct:b")
                .to("bean:myBean?method=addSecond");

                from("direct:c")
                .to("bean:myBean?method=addThird");
            }
        });
        ProducerTemplate template = camelContext.createProducerTemplate();
        camelContext.start();
        template.sendBody("direct:start", "Multicast");
    } finally {
        camelContext.stop();
    }
  }

JoinReplyAggregationStrategy 类如下所示

public class JoinReplyAggregationStrategy implements AggregationStrategy {

  public Exchange aggregate(Exchange exchange1, Exchange exchange2) {
    if (exchange1 == null) {
        return exchange2;
    } else {
        String body1 = exchange1.getIn().getBody(String.class);
        String body2 = exchange2.getIn().getBody(String.class);
        String merged = (body1 == null) ? body2 : body1 + "," + body2;
        exchange1.getIn().setBody(merged);
        return exchange1;
    }
  }
}

更新在您的情况下,您的聚合策略可能是按如下方式将所有交换聚集在一起:

public class ListAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            Message newIn = newExchange.getIn();
            Object newBody = newIn.getBody();
            List list = null;
            if (oldExchange == null) {
                    list = new ArrayList();
                    list.add(newBody);
                    newIn.setBody(list);
                    return newExchange;
            } else {
                    Message in = oldExchange.getIn();
                    list = in.getBody(List.class);
                    list.add(newBody);
                    return oldExchange;
            }
    }

}

关于java - Camel : How to join back to a single path after multicast?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36608517/

相关文章:

java - Camel 日志组件

java - Apache Camel - Groovy 脚本

java - Apache Camel 完成文件名

java - 这个 "NoSuchMethod"异常中缺少哪个方法?

java - 如何在android中实现地点选择器?

Java - 匹配特定文本的正则表达式

java - 从处理器内部设置 Camel 交换属性

Java DSL在camel交换属性中设置对象实例

java - 按指定顺序运行 JUnit4 测试类

java - 测试 junit 时忽略扫描仪输入