java - Spring Integration Java DSL - 异步执行多个服务激活器?

标签 java spring asynchronous spring-integration dsl

有一个包含任务列表的作业。 每个任务都有 id、名称、状态。

我为每个任务创建了服务激活器,如下所示:

@ServiceActivator
public Message<Task> execute(Message<Task> message){
    //do stuff
}

我已经为作业创建了一个网关 在集成流程中,从网关开始:

@Bean
    public IntegrationFlow startJob() {
        return f -> f
                .handle("jobService", "execute")
                .channel("TaskRoutingChannel");
    }

@Bean
    public IntegrationFlow startJobTask() {
        return IntegrationFlows.from("TaskRoutingChannel")
                .handle("jobService", "executeTasks")
                .route("headers['Destination-Channel']")
                .get();
    }

@Bean
    public IntegrationFlow TaskFlow() {
        return IntegrationFlows.from("testTaskChannel")
                .handle("aTaskService", "execute")
                .channel("TaskRoutingChannel")
                .get();
    }

@Bean
    public IntegrationFlow TaskFlow2() {
        return IntegrationFlows.from("test2TaskChannel")
                .handle("bTaskService", "execute")
                .channel("TaskRoutingChannel")
                .get();
    }

我已经使用上面的路由器按顺序执行任务。

但是,我需要启动该作业,并行执行它的所有任务。 我不知道如何实现这一点。我尝试在服务激活器方法上使用 @Async 并使其返回 void。但在这种情况下,我如何将其链接回路由 channel 并使其开始下一个任务? 请帮忙。谢谢。

编辑:

我使用 RecepientListRouter 和 ExecutorChannel 来获得并行执行:

@Bean
public IntegrationFlow startJobTask() {
    return IntegrationFlows.from("TaskRoutingChannel")
            .handle("jobService", "executeTasks")
            .routeToRecipients(r -> r
                .recipient("testTaskChannel")
                .recipient("test2TaskChannel"))
            .get();
}

@Bean ExecutorChannel testTaskChannel(){
    return new ExecutorChannel(this.getAsyncExecutor());
}

@Bean ExecutorChannel test2TaskChannel(){
    return new ExecutorChannel(this.getAsyncExecutor());
}
@Bean
public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(10);
    executor.setQueueCapacity(10);
    executor.initialize();
    return executor;
}

现在,有 3 个问题: 1)如果这是一个好方法,我如何将有效负载的特定部分发送到每个接收 channel 。假设有效负载是一个 List<>,我想将每个列表项发送到每个 channel 。 2)如何动态设置接收 channel ?从标题说?或者一个 list ? 3)这真的是一个好方法吗?有更好的方法来做到这一点吗?

提前致谢。

最佳答案

您的TaskRoutingChannel必须是 ExecutorChannel 的实例。例如:

return f -> f
            .handle("jobService", "execute")
            .channel(c -> c.executor("TaskRoutingChannel", threadPoolTaskExecutor()));

否则,是的:所有内容都通过单个 Thread 调用这对你的任务没有好处。

更新

让我尝试一一回答您的问题,尽管听起来每个问题都必须单独回答:-)。

  1. 如果您确实需要向多个服务发送相同的消息,您可以使用 routeToRecipients ,或者可以返回publishSubscribe 。甚至可以根据header做动态路由例如。

  2. 要将消息的一部分发送到每个 channel ,有足够的位置 .split()在您的.routeToRecipients()之前

  3. 为了回答您的最后一个问题,我需要了解该任务的业务要求。

关于java - Spring Integration Java DSL - 异步执行多个服务激活器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31954688/

相关文章:

java - 在哪里可以找到 Artifactory 存储库的 Ivy 描述符

java - 'String...' 是什么意思?

java - 我遇到此错误 java.lang.IndexOutOfBoundsException : Index: 0, 大小:0

java - ArrayList 与 Vector 通过引用传递

javascript - Angular promise - POST 请求

javascript - LocalForage async/await getItem() 等待 2 个变量然后完成加载

javascript - 我应该在 Node.js 和 Express 中链接方法和函数吗?

java - 为什么我需要一个存储库和一个服务+契约(Contract)

javascript - 通过javascript创建一个spring表单并附加到div中

java - Spring BOM 发布和依赖项的方法如何工作?