有一个包含任务列表的作业。 每个任务都有 id、名称、状态。
我为每个任务创建了服务激活器,如下所示:
@ServiceActivator
public Message<Task> execute(Message<Task> message){
//do stuff
}
我已经为作业创建了一个网关 在集成流程中,从网关开始:
@Bean
public IntegrationFlow startJob() {
return f -> f
.handle("jobService", "execute")
.channel("TaskRoutingChannel");
}
@Bean
public IntegrationFlow startJobTask() {
return IntegrationFlows.from("TaskRoutingChannel")
.handle("jobService", "executeTasks")
.route("headers['Destination-Channel']")
.get();
}
@Bean
public IntegrationFlow TaskFlow() {
return IntegrationFlows.from("testTaskChannel")
.handle("aTaskService", "execute")
.channel("TaskRoutingChannel")
.get();
}
@Bean
public IntegrationFlow TaskFlow2() {
return IntegrationFlows.from("test2TaskChannel")
.handle("bTaskService", "execute")
.channel("TaskRoutingChannel")
.get();
}
我已经使用上面的路由器按顺序执行任务。
但是,我需要启动该作业,并行执行它的所有任务。
我不知道如何实现这一点。我尝试在服务激活器方法上使用 @Async
并使其返回 void
。但在这种情况下,我如何将其链接回路由 channel 并使其开始下一个任务?
请帮忙。谢谢。
编辑:
我使用 RecepientListRouter 和 ExecutorChannel 来获得并行执行:
@Bean
public IntegrationFlow startJobTask() {
return IntegrationFlows.from("TaskRoutingChannel")
.handle("jobService", "executeTasks")
.routeToRecipients(r -> r
.recipient("testTaskChannel")
.recipient("test2TaskChannel"))
.get();
}
@Bean ExecutorChannel testTaskChannel(){
return new ExecutorChannel(this.getAsyncExecutor());
}
@Bean ExecutorChannel test2TaskChannel(){
return new ExecutorChannel(this.getAsyncExecutor());
}
@Bean
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(10);
executor.initialize();
return executor;
}
现在,有 3 个问题: 1)如果这是一个好方法,我如何将有效负载的特定部分发送到每个接收 channel 。假设有效负载是一个 List<>,我想将每个列表项发送到每个 channel 。 2)如何动态设置接收 channel ?从标题说?或者一个 list ? 3)这真的是一个好方法吗?有更好的方法来做到这一点吗?
提前致谢。
最佳答案
您的TaskRoutingChannel
必须是 ExecutorChannel
的实例。例如:
return f -> f
.handle("jobService", "execute")
.channel(c -> c.executor("TaskRoutingChannel", threadPoolTaskExecutor()));
否则,是的:所有内容都通过单个 Thread
调用这对你的任务没有好处。
更新
让我尝试一一回答您的问题,尽管听起来每个问题都必须单独回答:-)。
如果您确实需要向多个服务发送相同的消息,您可以使用
routeToRecipients
,或者可以返回publishSubscribe
。甚至可以根据header
做动态路由例如。要将消息的一部分发送到每个 channel ,有足够的位置
.split()
在您的.routeToRecipients()
之前为了回答您的最后一个问题,我需要了解该任务的业务要求。
关于java - Spring Integration Java DSL - 异步执行多个服务激活器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31954688/