我正在使用 JMS Outbound Gateway
将消息发送到请求队列并从单独的响应队列接收消息。我想添加功能,以便在消息成功发送到请求队列后调用特定 bean 的方法。
我正在为此使用 spring-integration 4.0.4
API 和 spring-integration-java-dsl 1.0.0
API,到目前为止我已经能够实现上述功能如下:
@Configuration
@EnableIntegration
public class IntegrationConfig {
...
@Bean
public IntegrationFlow requestFlow() {
return IntegrationFlows
.from("request.ch")
.routeToRecipients(r ->
r.ignoreSendFailures(false)
.recipient("request.ch.1", "true")
.recipient("request.ch.2", "true"))
.get();
}
@Bean
public IntegrationFlow sendReceiveFlow() {
return IntegrationFlows
.from("request.ch.1")
.handle(Jms.outboundGateway(cachingConnectionFactory)
.receiveTimeout(45000)
.requestDestination("REQUEST_QUEUE")
.replyDestination("RESPONSE_QUEUE")
.correlationKey("JMSCorrelationID"), e -> e.requiresReply(true))
.channel("response.ch").get();
}
@Bean
public IntegrationFlow postSendFlow() {
return IntegrationFlows
.from("request.ch.2")
.handle("requestSentService", "fireRequestSuccessfullySentEvent")
.get();
}
...
}
现在,虽然上面的配置有效,但我注意到在 request.ch.2
之前调用 request.ch.1
的唯一明显原因似乎是因为 channel 名称的字母顺序,而不是因为它们添加到 RecipientListRouter
本身的顺序。这样对吗?或者我在这里遗漏了什么?
* 下面的编辑显示了在 JMS 出站/入站适配器方法之间使用聚合器的解决方案(没有消息传递网关)*
集成配置:
@Configuration
@EnableIntegration
public class IntegrationConfig {
...
@Bean
public IntegrationFlow reqFlow() {
return IntegrationFlows
.from("request.ch")
.enrichHeaders(e -> e.headerChannelsToString())
.enrichHeaders(e -> e.headerExpression(IntegrationMessageHeaderAccessor.CORRELATION_ID, "headers['" + MessageHeaders.REPLY_CHANNEL + "']"))
.routeToRecipients(r -> {
r.ignoreSendFailures(false);
r.recipient("jms.req.ch", "true");
r.recipient("jms.agg.ch", "true");
})
.get();
}
@Bean
public IntegrationFlow jmsReqFlow() {
return IntegrationFlows
.from("jms.req.ch")
.handle(Jms.outboundAdapter(cachingConnectionFactory)
.destination("TEST_REQUEST_CH")).get();
}
@Bean
public IntegrationFlow jmsPostReqFlow() {
return IntegrationFlows
.from("jms.req.ch")
.handle("postSendService", "postSendProcess")
.get();
}
@Bean
public IntegrationFlow jmsResFlow() {
return IntegrationFlows
.from(Jms.inboundAdapter(cachingConnectionFactory).destination(
"TEST_RESPONSE_CH"),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(10)))
.channel("jms.agg.ch").get();
}
@Bean
public IntegrationFlow jmsAggFlow() {
return IntegrationFlows
.from("jms.agg.ch")
.aggregate(a -> {
a.outputProcessor(g -> {
List<Message<?>> l = new ArrayList<Message<?>>(g.getMessages());
Message<?> firstMessage = l.get(0);
Message<?> lastMessage = (l.size() > 1) ? l.get(l.size() - 1) : firstMessage;
Message<?> messageOut = MessageBuilder.fromMessage(lastMessage)
.setHeader(MessageHeaders.REPLY_CHANNEL, (String) firstMessage.getHeaders().getReplyChannel())
.build();
return messageOut;
});
a.releaseStrategy(g -> g.size() == 2);
a.groupTimeout(45000);
a.sendPartialResultOnExpiry(false);
a.discardChannel("jms.agg.timeout.ch");
}, null)
.channel("response.ch")
.get();
}
}
@Bean
public IntegrationFlow jmsAggTimeoutFlow() {
return IntegrationFlows
.from("jms.agg.timeout.ch")
.handle(Message.class, (m, h) -> new ErrorMessage(new MessageTimeoutException(m), h))
.channel("error.ch")
.get();
}
}
干杯, 下午
最佳答案
H-m... 看起来像。这确实是 DslRecipientListRouter
逻辑中的错误:https://github.com/spring-projects/spring-integration-java-dsl/issues/9
将很快修复并在几天内发布。
感谢您指出这一点!
顺便说一句。您的逻辑有点不正确:即使我们修复了 RecipientListRouter
,只有在 JmsOutboundGateway
收到 回复后,第二个 recipinet 才会收到相同的请求消息
,不仅仅是在请求被发送到请求队列之后。
它被阻塞的请求-回复过程。并且在 JmsOutboundGateway
中没有获取 reqeust 和 reply 之间的点的钩子(Hook)。
你觉得合适吗?
关于spring-integration - JMS 出站网关请求目的地 - 成功后处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27405495/