Spring-Integration-DSL:嵌套分散收集挂起

标签 spring-integration-dsl

这是一个损坏但可执行的示例代码:

@Bean
IntegrationFlow testFlow() {
    return IntegrationFlows
            .from(Http.inboundChannelAdapter("test")
                    .requestMapping(mapping -> mapping.methods(HttpMethod.GET))
                    .get())
            .scatterGather(
                    scatterer -> scatterer
                            .applySequence(true)
                            .recipientFlow(flow -> flow
                                    .scatterGather(
                                            scatterer1 -> scatterer1
                                                    .applySequence(true)
                                                    .recipientFlow(IntegrationFlowDefinition::bridge),
                                            gatherer -> gatherer.outputProcessor(MessageGroup::getOne))
                                    .log(INFO, m -> "THIS HAPPENS")),
                    gatherer -> gatherer.outputProcessor(MessageGroup::getOne))
            .log(INFO, m -> "THIS NEVER HAPPENS")
            .get();
}

预期输出是:

THIS HAPPENS
THIS NEVER HAPPENS

实际输出是:

THIS HAPPENS

我找到了this identical looking issue在 Github 上,但它声称已在 5.1.105.2.4 版本中修复。我正在运行 spring-boot-starter-integration 5.5.0 其中包括相同版本的 spring-integration-core 。

我该怎么做才能让这个嵌套的分散聚集工作?这是 DSL 还是我的代码的错误吗?

最佳答案

在仅使用一级分散收集遇到类似问题后,我意识到是日志消息阻止了输出返回到父流。将 .log() 替换为 .logAndReply().log().bridge(),一切都会再次正常工作。

像这样:

@Bean
IntegrationFlow testFlow() {
    return IntegrationFlows
            .from(Http.inboundChannelAdapter("test")
                    .requestMapping(mapping -> mapping.methods(HttpMethod.GET))
                    .get())
            .scatterGather(
                    scatterer -> scatterer
                            .applySequence(true)
                            .recipientFlow(flow -> flow
                                    .scatterGather(
                                            scatterer1 -> scatterer1
                                                    .applySequence(true)
                                                    .recipientFlow(IntegrationFlowDefinition::bridge),
                                            gatherer -> gatherer.outputProcessor(MessageGroup::getOne))
                                    .logAndReply(INFO, m -> "THIS HAPPENS")), // this fixes the problem
                    gatherer -> gatherer.outputProcessor(MessageGroup::getOne))
            .log(INFO, m -> "THIS NEVER HAPPENS")
            .get();
}

关于Spring-Integration-DSL:嵌套分散收集挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68370133/

相关文章:

java - Spring 集成异常堆栈跟踪像 Apache Camel 一样?

java - 如何在Spring集成DSL中为 channel 设置多个消息处理程序?

spring-boot - Spring集成Java DSL错误

java - Spring 集成中屏障组件如何工作?

java - 使用spring集成dsl逐行读取文件

java - TaskExecutor 不工作 Spring Integration

java - 使用 Spring Integration 5 上的 Spring Integration Java DSL 在入站 channel 上配置目录扫描器

java - 如何在 Spring Integration 中路由处理过程中抛出异常的消息?

spring-integration - 如何在 Spring 集成测试中调用 channel

spring-integration - Spring 集成 : How to dynamically create subdir on sftp using IntegrationFlow