这似乎是一个非常简单的问题,但我已经尝试了所有我能想到的方法。基本上,我有一个计时器路由,可以将其消息发送到一堆不同的 bean。这些 bean 在交换器上设置了一个属性(我也尝试过在消息上设置一个 header ),我希望将所有这些 bean 的交换输出定向到一个过滤器(检查属性或 header ),然后可选地另一个端点。像这样:
---> Bean A ---
/ \
timer --> multicast ------> Bean B ------> end --> filter --> endpoint
\ /
---> Bean C ---
目前路由看起来像这样,它适用于向 bean 进行多播:
from("timer://my-timer?fixedRate=true&period=20000&delay=0")
.multicast()
.to("bean:beanA", "bean:beanB", "bean:beanC");
以下是我尝试过的一些解决方案:
解决方案一
from("timer://my-timer?fixedRate=true&period=20000&delay=0")
.multicast()
.to("bean:beanA", "bean:beanB", "bean:beanC")
.filter(new myPredicate())
.to("myOptionalEndpoint");
这使过滤器与 bean 并行,而不是在它们之后。
方案二
from("timer://my-timer?fixedRate=true&period=20000&delay=0")
.multicast()
.to("bean:beanA", "bean:beanB", "bean:beanC")
.end()
.filter(new myPredicate())
.to("myOptionalEndpoint");
并行执行 bean,然后执行过滤器。但是,未设置属性/ header 。似乎交换是刚结束的计时器,而不是经历过 bean 类的交换......
编辑: 我尝试设置正文,但实际上到达过滤器的消息没有正文。我无法想象 Camel
会以某种方式去除消息的有效负载,因此我必须假设此交换是来自计时器的新交换,而不是经过 bean 的交换。但是,它发生在 bean 完成后。
方案三
from("timer://my-timer?fixedRate=true&period=20000&delay=0")
.multicast()
.beanRef("beanA").to("direct:temp")
.beanRef("beanB").to("direct:temp")
.beanRef("beanC").to("direct:temp")
.end()
from("direct:temp")
.filter(new myPredicate())
.to("myOptionalEndpoint");
消息按预期到达过滤器,但我设置的属性/ header 消失了,因此没有消息通过过滤器。
编辑:尸体也不见了,很明显我没有得到来自 bean 的相同交换...
澄清一下,我正在寻找一种解决方案,其中将来自计时器的单个交换多播到每个 bean(因此现在我们有 3 个交换),然后将这 3 个中的每一个发送到过滤器。
谁能帮我弄清楚如何构建这条路线?
最佳答案
您需要使用聚合策略才能将所有结果聚合为一个。
下面是来自 http://javarticles.com/2015/05/apache-camel-multicast-examples.html 的一个很好的例子(请参阅具有自定义聚合策略的多播
部分)
public class CamelMulticastAggregationExample {
public static final void main(String[] args) throws Exception {
JndiContext jndiContext = new JndiContext();
jndiContext.bind("myBean", new MyBean());
CamelContext camelContext = new DefaultCamelContext(jndiContext);
try {
camelContext.addRoutes(new RouteBuilder() {
public void configure() {
from("direct:start")
.multicast()
.aggregationStrategy(new JoinReplyAggregationStrategy())
.to("direct:a", "direct:b", "direct:c")
.end()
.to("stream:out");
from("direct:a")
.to("bean:myBean?method=addFirst");
from("direct:b")
.to("bean:myBean?method=addSecond");
from("direct:c")
.to("bean:myBean?method=addThird");
}
});
ProducerTemplate template = camelContext.createProducerTemplate();
camelContext.start();
template.sendBody("direct:start", "Multicast");
} finally {
camelContext.stop();
}
}
JoinReplyAggregationStrategy
类如下所示
public class JoinReplyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange exchange1, Exchange exchange2) {
if (exchange1 == null) {
return exchange2;
} else {
String body1 = exchange1.getIn().getBody(String.class);
String body2 = exchange2.getIn().getBody(String.class);
String merged = (body1 == null) ? body2 : body1 + "," + body2;
exchange1.getIn().setBody(merged);
return exchange1;
}
}
}
更新在您的情况下,您的聚合策略可能是按如下方式将所有交换聚集在一起:
public class ListAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Message newIn = newExchange.getIn();
Object newBody = newIn.getBody();
List list = null;
if (oldExchange == null) {
list = new ArrayList();
list.add(newBody);
newIn.setBody(list);
return newExchange;
} else {
Message in = oldExchange.getIn();
list = in.getBody(List.class);
list.add(newBody);
return oldExchange;
}
}
}
关于java - Camel : How to join back to a single path after multicast?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36608517/