apache-camel - Camel JMS 请求-回复,回复消息为 'n'

标签 apache-camel ibm-mq camel-jms

我正在使用 Camel JMS 组件进行请求-回复,以便与 MQ 进行通信。对于我的一些请求,我可以收到 n 条回复消息。如何聚合这些回复消息?

我考虑使用带有聚合策略的聚合器模式,但无法使用它,因为我不确定可以回复的消息数量。

社区可以帮助我了解什么是正确的做法吗?我做了一些谷歌搜索但找不到有用的东西。下面是我的示例路线代码

from("direct:"+routeName).routeId(routeName)
                        .setHeader("JMSCorrelationID", constant(UUID.randomUUID().toString()))
                        .circuitBreaker()
                            .resilience4jConfiguration()
                            .minimumNumberOfCalls(3)
                        .end()
                        .to(mqComponentBeanName+"://CAMELDEMO?exchangePattern=InOut&requestTimeout=10000&replyTo=CAMELDEMOREPLY")
                            .log("${body}")
                            .unmarshal(customerDetailsOutBound)
                            .process(new Processor() {
                                    @Override
                                    public void process(Exchange exchange) throws Exception {
                                        System.out.println(exchange.getIn().getBody().toString());
                                    }
                            })
                        .onFallback().process(new Processor() {
                            @Override
                            public void process(Exchange exchange) throws Exception {
                                System.out.println("Store this message to backup");
                            }
                        })
                        .end();

期待从社区获得一些好的见解。谢谢。

最佳答案

消息流

  1. 您的第一个路由向 CAMELDEMO 队列发送一条消息,并开始在新队列 CAMELDEMO_AGGREGATED_REPLY 上等待单个聚合消息
  2. CAMELDEMO 上接收消息的组件,开始向 CAMELDEMOREPLY 队列发送响应,并指示将发送多少响应
  3. 下面的第二条路由开始监听 CAMELDEMOREPLY,聚合消息并将聚合消息发送到 CAMELDEMO_AGGREGATED_REPLY
  4. 等待 CAMELDEMO_AGGREGATED_REPLY 回复的第一条路由获取聚合回复,接收单个消息并将其发回

原始路线已更新,等待CAMELDEMO_AGGREGATED_REPLY上的回复

...
.to(mqComponentBeanName+"://CAMELDEMO?exchangePattern=InOut&requestTimeout=10000&
                replyTo=CAMELDEMO_AGGREGATED_REPLY")
.log("${body}")
.unmarshal(customerDetailsOutBound)
.process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            System.out.println(exchange.getIn().getBody().toString());
        }
})
....

聚合消息的第二条路由

from(mqComponentBeanName+"://CAMELDEMOREPLY?
                          exchangePattern=In&requestTimeout=10000)
.aggregate(header("JMSCorrelationID"), new MyAggregationStrategy())
.to(mqComponentBeanName+"://CAMELDEMO_AGGREGATED_REPLY?
                          exchangePattern=Out&requestTimeout=10000)
public final class MyCompletionStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExch, Exchange newExchange) 
    {
        ...
        //Here you check your flag regarding the number of responses
        // you were supposed to receive, and if it is met
        // complete the aggregation by setting it to true
        oldExch.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
                ...
         return oldExchange;
     }
}

关于apache-camel - Camel JMS 请求-回复,回复消息为 'n',我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62507347/

相关文章:

ibm-mq - 哪个是连接大型机和 java 的经过验证的解决方案? MQ系列/IBM CICS Transaction Gateway哪个最好?

java - 成功处理 Camel 内部重新传递后如何防止来自 MQ Broker 的消息重新传递(事务性 Camel 路由)

java - ActiveMQ:队列(具有并发消费者)和主题的正确配置

java - Camel : java. lang.IllegalArgumentException:必须指定 defaultEndpoint

ibm-mq - IBM WAS7队列工厂配置到MQ集群

java - 如何发出 JMS 同步请求

java - JMS如何读取队列上的多个文件

c# - .net 中的 Apache Camel 替代品?

jaxb - 如何从xml中删除元素名称?