java - 从直接 channel 流返回值并在 Spring Integration 中继续处理异步流

标签 java spring spring-integration

我的网络应用程序中有以下集成配置:

@Bean
IntegrationFlow giraFlow() {
    return IntegrationFlows.from(
            MessageChannels.direct("gira.input"))
            .split()
            .transform(transformer)
            .handle(parserService)
            .channel(routerChannel())
            .get();
}

@Bean
MessageChannel routerChannel() {
    return MessageChannels.queue("routerChannel", 10)
            .get();
}


@Bean
IntegrationFlow routerChannelFlow() {
    return IntegrationFlows.from(
            routerChannel())
            .route(p -> p.getKind().name(),
                    m -> m.suffix("Channel")
                            .channelMapping(TaskKind.CREATE.name(), "create")
                            .channelMapping(TaskKind.RELOAD.name(), "reload")
            .get();
}

和网关:

@MessagingGateway
public interface GW {

    @Gateway(requestChannel = "gira.input")
    Task gira(Collection<Request> messages);

}

和一个parserService

@Service
@Slf4j
public class ParserService {

    public Task handle(IssueTrackerTask task) {
        log.info("Parser service handling task {}", task);
        return task;
    }
}

我从 Spring MVC Controller 调用网关方法,我希望它返回一个 Task 对象,该对象是 parserService 在其主体方法中返回的。重要的是我希望 Controller 被阻塞,直到它从 parserService 获取值为止。获得此值后,我希望我的集成流程与 routerChannelFlow 异步进行,以便 Web Controller 方法尽快返回,以及 routerChannelFlow 中的所有繁重操作> 将在不阻塞 Controller 的情况下完成。

这是具有此网关方法调用的 Controller 的一部分:

...
Task gira = gw.gira(messages);
log.info("Result {}", gira);
...

问题是 log.info 永远无法到达,并且 gira() 网关被永远阻止。

我怎样才能实现我想要的行为?

附注我的应用程序中实际上不需要 parserService,这只是我认为可以帮助我定义网关的返回值,但它实际上没有帮助:(

更新

这就是我在加里·拉塞尔发表评论后得到的结果:

@Bean
public ThreadPoolTaskExecutor executor() {
    ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
    pool.setCorePoolSize(10);
    pool.setMaxPoolSize(10);
    pool.setWaitForTasksToCompleteOnShutdown(true);
    return pool;
}

@Bean
MessageChannel routerChannel() {
    return MessageChannels
            .publishSubscribe("routerChannel", executor())
            .get();
}

@Bean
IntegrationFlow routerChannelFlow() {
    return IntegrationFlows
            .from(routerChannel())
            .publishSubscribeChannel(s -> s
                 .subscribe(f -> f
                         .bridge(null))
                 .subscribe(process()))
            .get();
}

@Bean
IntegrationFlow process() {
    return f ->
            f.<IssueTrackerTask, String>route(p -> p.getKind().name(),
                    m -> m.suffix("Channel")
                            .channelMapping(TaskKind.CREATE.name(), "create")
                            .channelMapping(TaskKind.RELOAD.name(), "reload")

}

当我尝试使用此管道时,出现以下错误Dispatcher 没有 channel “application:development:9000.dummy”的订阅者。这绝对是一个配置错误的问题,但我无法弄清楚我做错了什么。

更新

channel("dummy")更改为bridge(null)

最佳答案

createreload channel 的下游是什么?

您如何处理 Controller 中的 Task 结果(除了记录它)?

如果不需要结果,请将网关返回更改为void,并在网关下游添加执行器 channel 。

如果您希望返回 Task 对象,则需要 routerChannel 成为一个具有执行者和两个订阅者的发布/订阅 channel - 一个通向无处的桥梁(输入 channel 并且没有输出 channel ),它将把任务返回到网关和路由器,它将在单独的线程上路由任务;来自路由器的下游流不得返回结果(网关早就停止等待结果)。

您可以将两个路由器 channel 设置为执行程序 channel ,而不是将执行程序添加到routerChannel

关于java - 从直接 channel 流返回值并在 Spring Integration 中继续处理异步流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36118961/

相关文章:

spring - 在 bootRun 任务中传递 spring.config.location

java - 如何按列类型拦截 hibernate 查询

java - 如何从 Http 集成流创建 Spring Reactor Flux?

java - 字符串中子字符串的出现次数

java - 如何通过@Cacheable 注解记录缓存命中?

java - 拖动后执行意外操作

java - Spring 多个 : crash early 类型的 bean

spring-integration - 在 spring 集成实用程序中启用日志记录

java - 防止键盘显示字符气泡

java - Servlet 中的线程