spring-integration - 分解 DSL IntegrationFlows

标签 spring-integration

我一直在研究 Spring Integration (SI) DSL。我有一个定义了以下异步网关的 Rest 服务:

@MessagingGateway
public interface Provision {
    @Async
    @Gateway(requestChannel = "provision.input")
    ListenableFuture<List<ResultDto>> provision(List<ItemsDto> stuff);
}

在逐行演练中,我有以下示例 IntegrationFlow。

@Bean
public IntegrationFlow provision() {
    return f -> f
            .split(ArrayList.class, List::toArray)
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .<ItemsDto, String>route(ItemsDto::getType, m -> m
                            .subFlowMapping("IPTV", sf -> sf
                                            .<ItemsDto, String>route(ItemsDto::getAction, m2 -> m2
                                                    .subFlowMapping("OPEN", sf2 -> sf2
                                                            .handle((p, h) -> iptvService.open((ItemsDto) p))))
                            )
            )
            .aggregate();
}

如您所见,我有多层路由。我需要把事情分解一下。我已经尝试了几种不起作用的方法(在这里我没有得到响应......线程不等待):

@Bean(name = "routerInput")
private MessageChannel routerInput() {
    return MessageChannels.direct().get();
}

@Bean
public IntegrationFlow provision() {
    return f -> f
            .split(ArrayList.class, List::toArray)
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .<ItemsDto, String>route(ItemsDto::getType, m ->
                            m.subFlowMapping("IPTV", sf -> sf.channel("routerInput"))
            )
            .aggregate();
}

@Bean
public IntegrationFlow action() {
    return IntegrationFlows.from("routerInput")
            .<ItemsDto, String>route(ItemsDto::getAction, m -> m
                    .subFlowMapping("OPEN", sf -> sf
                            .handle(p -> iptvService.open((ItemsDto) p.getPayload())))).get();
}

我显然在概念上遗漏了一些东西 :) 有人可以提供“如何以及为什么”的意见吗?

我有一个需要拆分的项目列表,按“类型”路由,然后按“操作”路由,最后聚合(包含处理程序的响应)。每个处理的项目都需要并行处理。

提前致谢

更新: 根据 Artem 的建议,我删除了所有异步内容。我把它修剪成几乎没有...

@Bean(name = "routerInput")
private MessageChannel routerInput() {
    return MessageChannels.direct().get();
}

@Bean
public IntegrationFlow provision() {
    return f -> f
            .split()
            .<ItemDto, String>route(ItemDto::getType, m ->
                    m.subFlowMapping("IPTV", sf -> sf.channel("routerInput")))
            .aggregate();
}

@Bean
public IntegrationFlow action() {
    return IntegrationFlows.from("routerInput")
            .<ItemDto, String>route(ItemDto::getAction, m -> m
                    .subFlowMapping("OPEN", sf -> sf
                            .handle((p, h) -> iptvService.open((ItemDto) p)))).get();
}

我让它通过改变来响应

.handle(p ->

对此

.handle((p, h) ->

所以它至少会响应,但不会聚合拆分的 3 个测试项目。输出包含 1 个项目。我需要使用流收集吗?发布政策?这不应该没问题吗?

最佳答案

如果你想把它拆开,使用 channelMapping 可能比 subflowMapping 更简单......

    @Bean
    public IntegrationFlow typeRoute() {
        return IntegrationFlows.from(foo())
                .split()
                .<ItemsDto, String>route(ItemsDto::getType, m -> m
                        .channelMapping("foo", "channel1")
                        .channelMapping("bar", "channel2"))
                .get();
    }

    @Bean
    public IntegrationFlow fooActionRoute() {
        return IntegrationFlows.from(channel1())
                .<ItemsDto, String>route(ItemsDto::getAction, m -> m
                        .channelMapping("foo", "channel3")
                        .channelMapping("bar", "channel4"))
                .get();
    }

    @Bean
    public IntegrationFlow barActionRoute() {
        return IntegrationFlows.from(channel1())
                .<ItemsDto, String>route(ItemsDto::getAction, m -> m
                        .channelMapping("foo", "channel5")
                        .channelMapping("bar", "channel6"))
                .get();
    }

    @Bean
    public IntegrationFlow fooFooHandle() {
        return IntegrationFlows.from(channel3())
                // handle
                .channel(aggChannel())
                .get();
    }

为其他选项创建流程并聚合每个结果:

    // fooBarHandle(), barFooHandle(), barBarHandle()


    @Bean IntegrationFlow agg() {
        return IntegrationFlows.from(aggChannel())
                .aggregate()
                .get();
    }

并行度是通过使用 ExecutorChannels...

    @Bean
    public MessageChannel channel1() {
        return new ExecutorChannel(exec());
    }

    @Bean
    public MessageChannel channel2() {
        return new ExecutorChannel(exec());
    }

    @Bean
    public MessageChannel channel3() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel channel4() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel channel5() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel channel6() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel aggChannel() {
        return new DirectChannel();
    }

关于spring-integration - 分解 DSL IntegrationFlows,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34706312/

相关文章:

Spring Integration DefaultFtpsSessionFactory

java - spring集成kafka监听线程当并发=分区计数时读取多个分区

java - 在 Spring 集成中设置生存时间的动态值

java - 如何通过 ftp 入站 channel 适配器轮询本地 ftp 目录?

java - Spring集成出站网关想要使用动态URL

spring-boot - Spring Boot 上 Spring 集成的 Kafka 配置

java - 将 Spring Integration executorChannel 与 Spring Cloud Function 结合使用

使用 spring-integration-ip 的 Java 服务器

spring-integration - Spring 集成 dsl : route by payload type

Spring Integration FTP - 轮询而不传输?