java - Spring Integration Java DSL -- 聚合器的配置

标签 java spring-integration dsl aggregator

我有一个非常简单的集成流程,其中使用发布-订阅 channel 将 RESTful 请求转发给两个提供者。然后将来自两个 RESTful 服务的结果聚合到一个数组中。集成流程示意图如下:

@Bean
IntegrationFlow flow() throws Exception {
    return IntegrationFlows.from("inputChannel")
            .publishSubscribeChannel(s -> s.applySequence(true)
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider1.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                ).subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider2.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class)
                        )
                )
            )
            .aggregate()
            .get();
}

但是,在运行我的代码时,生成的数组仅包含由一个 RESTful 服务返回的项目。是否缺少任何配置步骤?

更新

考虑到 Artem 的评论,以下版本对应于完整的解决方案。

@Bean
IntegrationFlow flow() throws Exception {
    return IntegrationFlows.from("inputChannel-scatter")
            .publishSubscribeChannel(s -> s.applySequence(true)
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider1.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                        .channel("inputChannel-gather"))
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider2.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                        .channel("inputChannel-gather")))
            .get();
}

@Bean
IntegrationFlow gatherFlow() {
    return IntegrationFlows.from("inputChannel-gather")
            .aggregate(a -> a.outputProcessor(g ->  new GenericMessage<ItemDTO[]>(
                        g.getMessages().stream()
                                .flatMap(m -> Arrays.stream((ItemDTO[]) m.getPayload()))
                                .collect(Collectors.toList()).toArray(new ItemDTO[0]))))
            .get();
}

最佳答案

实际上不是那样的。

.aggregate() 是该 publishSubscribeChannel第三个订阅者。

您必须将您的流量切断给其中两个。像这样:

    @Bean
    public IntegrationFlow publishSubscribeFlow() {
        return flow -> flow
                .publishSubscribeChannel(s -> s
                        .applySequence(true)
                        .subscribe(f -> f
                                .handle((p, h) -> "Hello")
                                .channel("publishSubscribeAggregateFlow.input"))
                        .subscribe(f -> f
                                .handle((p, h) -> "World!")
                                .channel("publishSubscribeAggregateFlow.input"))
                );
    }

    @Bean
    public IntegrationFlow publishSubscribeAggregateFlow() {
        return flow -> flow
                .aggregate(a -> a.outputProcessor(g -> g.getMessages()
                        .stream()
                        .<String>map(m -> (String) m.getPayload())
                        .collect(Collectors.joining(" "))))
                .channel(c -> c.queue("subscriberAggregateResult"));
    }

请注意两个订阅者的 .channel("publishSubscribeAggregateFlow.input") 用法。

老实说,这是任何发布-订阅的重点。如果我们要聚合它们,我们必须知道将所有订阅者的结果发送到哪里。

您的用例让我想起了 Scatter-Gather EIP模式。

我们还没有在 DSL 中实现它。 欢迎提出GH issue在这个问题上,我们将尝试在即将到来的 1.2 版本中处理它。

更新

关于此事的GH问题:https://github.com/spring-projects/spring-integration-java-dsl/issues/75

关于java - Spring Integration Java DSL -- 聚合器的配置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36742888/

相关文章:

java - 应用程序子类中的奇怪异常

java - 如何从 java 运行多线程 python 脚本?

Clojure "DSL"编程

elasticsearch - Elasticsearch查询以查找丢失的记录

java - 无法通过 spring 配置从 jar 检索类路径中的私钥 - jar 内的文件对 spring 不可见

c# - 如何设计流畅的界面(用于异常处理)?

java - JProgressBar 更新不工作

java - 什么是 GWT 开发者插件协议(protocol)

javascript - CORS 不适用于 jQuery 和 Java

java - 如何使用 Spring Integration 调用安全的 SOAP 服务