spring-integration - JMS 出站网关请求目的地 - 成功后处理

标签 spring-integration spring-java-config

我正在使用 JMS Outbound Gateway 将消息发送到请求队列并从单独的响应队列接收消息。我想添加功能,以便在消息成功发送到请求队列后调用特定 bean 的方法。

我正在为此使用 spring-integration 4.0.4 API 和 spring-integration-java-dsl 1.0.0 API,到目前为止我已经能够实现上述功能如下:

@Configuration
@EnableIntegration
public class IntegrationConfig {

    ...

    @Bean
    public IntegrationFlow requestFlow() {

        return IntegrationFlows
            .from("request.ch")
            .routeToRecipients(r ->
                r.ignoreSendFailures(false)
                 .recipient("request.ch.1", "true")
                 .recipient("request.ch.2", "true"))
            .get();
    }

    @Bean
    public IntegrationFlow sendReceiveFlow() {

        return IntegrationFlows
            .from("request.ch.1")
            .handle(Jms.outboundGateway(cachingConnectionFactory)
                    .receiveTimeout(45000)
                    .requestDestination("REQUEST_QUEUE")
                    .replyDestination("RESPONSE_QUEUE")
                    .correlationKey("JMSCorrelationID"), e -> e.requiresReply(true))
                    .channel("response.ch").get();
    }

    @Bean
    public IntegrationFlow postSendFlow() {

        return IntegrationFlows
            .from("request.ch.2")
            .handle("requestSentService", "fireRequestSuccessfullySentEvent")
            .get();
    }

    ...
}

现在,虽然上面的配置有效,但我注意到在 request.ch.2 之前调用 request.ch.1 的唯一明显原因似乎是因为 channel 名称的字母顺序,而不是因为它们添加到 RecipientListRouter 本身的顺序。这样对吗?或者我在这里遗漏了什么?

* 下面的编辑显示了在 JMS 出站/入站适配器方法之间使用聚合器的解决方案(没有消息传递网关)*

集成配置:

@Configuration
@EnableIntegration
public class IntegrationConfig { 

    ...

    @Bean
    public IntegrationFlow reqFlow() {

        return IntegrationFlows
            .from("request.ch")
            .enrichHeaders(e -> e.headerChannelsToString())
            .enrichHeaders(e -> e.headerExpression(IntegrationMessageHeaderAccessor.CORRELATION_ID, "headers['" + MessageHeaders.REPLY_CHANNEL + "']"))             
            .routeToRecipients(r -> {
                r.ignoreSendFailures(false);
                r.recipient("jms.req.ch", "true");
                r.recipient("jms.agg.ch", "true");
            })
            .get();
    }

    @Bean
    public IntegrationFlow jmsReqFlow() {

        return IntegrationFlows
            .from("jms.req.ch")
            .handle(Jms.outboundAdapter(cachingConnectionFactory)
                    .destination("TEST_REQUEST_CH")).get();
    }

    @Bean
    public IntegrationFlow jmsPostReqFlow() {

        return IntegrationFlows
            .from("jms.req.ch")
            .handle("postSendService", "postSendProcess")
            .get();
    }

    @Bean
    public IntegrationFlow jmsResFlow() {

        return IntegrationFlows
            .from(Jms.inboundAdapter(cachingConnectionFactory).destination(
                    "TEST_RESPONSE_CH"),
                    c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(10)))
            .channel("jms.agg.ch").get();
    }

    @Bean
    public IntegrationFlow jmsAggFlow() {

        return IntegrationFlows
            .from("jms.agg.ch")
            .aggregate(a -> { 
                a.outputProcessor(g -> {
                    List<Message<?>> l = new ArrayList<Message<?>>(g.getMessages());

                    Message<?> firstMessage = l.get(0);
                    Message<?> lastMessage = (l.size() > 1) ? l.get(l.size() - 1) : firstMessage;

                    Message<?> messageOut = MessageBuilder.fromMessage(lastMessage)
                            .setHeader(MessageHeaders.REPLY_CHANNEL, (String) firstMessage.getHeaders().getReplyChannel())
                            .build();

                     return messageOut;
                }); 
                a.releaseStrategy(g -> g.size() == 2);
                a.groupTimeout(45000);
                a.sendPartialResultOnExpiry(false);
                a.discardChannel("jms.agg.timeout.ch");
            }, null)
            .channel("response.ch")
            .get();
        }
    }

    @Bean
    public IntegrationFlow jmsAggTimeoutFlow() {
        return IntegrationFlows
            .from("jms.agg.timeout.ch")
            .handle(Message.class, (m, h) -> new ErrorMessage(new MessageTimeoutException(m), h))
            .channel("error.ch")
            .get();
    }
}

干杯, 下午

最佳答案

H-m... 看起来像。这确实是 DslRecipientListRouter 逻辑中的错误:https://github.com/spring-projects/spring-integration-java-dsl/issues/9 将很快修复并在几天内发布。

感谢您指出这一点!

顺便说一句。您的逻辑有点不正确:即使我们修复了 RecipientListRouter,只有在 JmsOutboundGateway 收到 回复后,第二个 recipinet 才会收到相同的请求消息,不仅仅是在请求被发送到请求队列之后。 它被阻塞的请求-回复过程。并且在 JmsOutboundGateway 中没有获取 reqeust 和 reply 之间的点的钩子(Hook)。

你觉得合适吗?

关于spring-integration - JMS 出站网关请求目的地 - 成功后处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27405495/

相关文章:

java - Spring 集成 sftp :inbound-channel-adapter delete-remote-files=false

java - 持久订阅 ActiveMQ

java - Spring @ComponentScan 注释不起作用

spring-boot - Spring Integration DSL 添加中流事务

java - Spring 集成: How to filter specific headers - only possible with header mapper?

java - 如何在 Spring boot 中设置 UTF-8 字符编码?

java - 对于具有纯 Java 配置的 Spring Web 应用程序,拥有 2 个不同的上下文有意义吗?

java - Spring 启动: delegateBuilder cannot be null on autowiring authenticationManager in custom UserDetailsService

java - 使用 Spring Integration JDBC 出站网关更新数据