我正在使用 Camel JMS 组件进行请求-回复,以便与 MQ 进行通信。对于我的一些请求,我可以收到 n 条回复消息。如何聚合这些回复消息?
我考虑使用带有聚合策略的聚合器模式,但无法使用它,因为我不确定可以回复的消息数量。
社区可以帮助我了解什么是正确的做法吗?我做了一些谷歌搜索但找不到有用的东西。下面是我的示例路线代码
from("direct:"+routeName).routeId(routeName)
.setHeader("JMSCorrelationID", constant(UUID.randomUUID().toString()))
.circuitBreaker()
.resilience4jConfiguration()
.minimumNumberOfCalls(3)
.end()
.to(mqComponentBeanName+"://CAMELDEMO?exchangePattern=InOut&requestTimeout=10000&replyTo=CAMELDEMOREPLY")
.log("${body}")
.unmarshal(customerDetailsOutBound)
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(exchange.getIn().getBody().toString());
}
})
.onFallback().process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("Store this message to backup");
}
})
.end();
期待从社区获得一些好的见解。谢谢。
最佳答案
消息流
- 您的第一个路由向
CAMELDEMO
队列发送一条消息,并开始在新队列CAMELDEMO_AGGREGATED_REPLY
上等待单个聚合消息 - 在
CAMELDEMO
上接收消息的组件,开始向 CAMELDEMOREPLY 队列发送响应,并指示将发送多少响应 - 下面的第二条路由开始监听
CAMELDEMOREPLY
,聚合消息并将聚合消息发送到CAMELDEMO_AGGREGATED_REPLY
。 - 等待
CAMELDEMO_AGGREGATED_REPLY
回复的第一条路由获取聚合回复,接收单个消息并将其发回
原始路线已更新,等待CAMELDEMO_AGGREGATED_REPLY
上的回复
...
.to(mqComponentBeanName+"://CAMELDEMO?exchangePattern=InOut&requestTimeout=10000&
replyTo=CAMELDEMO_AGGREGATED_REPLY")
.log("${body}")
.unmarshal(customerDetailsOutBound)
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(exchange.getIn().getBody().toString());
}
})
....
聚合消息的第二条路由
from(mqComponentBeanName+"://CAMELDEMOREPLY?
exchangePattern=In&requestTimeout=10000)
.aggregate(header("JMSCorrelationID"), new MyAggregationStrategy())
.to(mqComponentBeanName+"://CAMELDEMO_AGGREGATED_REPLY?
exchangePattern=Out&requestTimeout=10000)
public final class MyCompletionStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExch, Exchange newExchange)
{
...
//Here you check your flag regarding the number of responses
// you were supposed to receive, and if it is met
// complete the aggregation by setting it to true
oldExch.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
...
return oldExchange;
}
}
关于apache-camel - Camel JMS 请求-回复,回复消息为 'n',我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62507347/