在我的应用程序中,我配置了一些 channel ,如下所示:
@Bean
public MessageChannel eventFilterChannel() {
return new ExecutorChannel(asyncConfiguration.getAsyncExecutor());
}
@Bean
public MessageChannel processEventChannel() {
return new ExecutorChannel(asyncConfiguration.getAsyncExecutor());
}
我正在使用 ExecutorChannel
并使用我的自定义 Executor
,如下所示:
@Configuration
@EnableAsync
public class AsyncConfiguration extends AsyncConfigurerSupport {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(100);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("MyAppThread");
executor.initialize();
return executor;
}
}
我有以下 MessageEndpoint
,它是 eventFilterChannel
channel 的订阅者:
@MessageEndpoint
public class MyEventFilter {
@Filter(inputChannel = "eventFilterChannel", outputChannel = "processEventChannel")
public boolean filterEvents(final MyEvent myEvent) {
//filter logic
}
}
理想情况下,我希望我的事件过滤器消息端点是多线程的,因为我使用的是ExecutorChannel
。我想了解这是否是多线程端点的正确实现?
但是,我很怀疑,因为我可以在日志中看到以下内容:
Channel 'application.eventFilterChannel' has 1 subscriber(s).
我的实现是否正确或者是否有我可以遵循的标准?
最佳答案
嗯,有一点误导。您的 eventFilterChannel
实际上只有一个订阅者 - 您的 @Filter
。但它确实是多线程的。相同的无状态组件在多个线程中使用。
ExecutorChannel
对传入任务进行排队,并在池中的线程上并行执行它们。在我们的例子中,故事是关于消息传递的。不确定代码是否可以帮助您,但它看起来像:
public final boolean dispatch(final Message<?> message) {
if (this.executor != null) {
Runnable task = createMessageHandlingTask(message);
this.executor.execute(task);
return true;
}
return this.doDispatch(message);
}
其中Runnable
是这样的:
public void run() {
doDispatch(message);
}
...
handler.handleMessage(message);
这个处理程序
正是该@Filter
的订阅者。
因此,从不同的线程调用相同的方法。由于这是被动且无状态的组件,因此仅保留一次并从不同线程中重用是安全的。
另一方面,脱离主题:如果您向此 channel 添加更多订阅者,无论如何它们都不会被并行调用:默认情况下,它是循环策略:根据以下条件选择下一条消息的处理程序指数。
如果一个处理程序无法处理消息,我们将尝试下一个处理程序,依此类推。不过,您可以注入(inject)任何其他自定义实现。或者甚至将其重置为 null
以始终从第一个开始。
关于java - spring 集成 ExecutorChannel 的多订阅者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49384126/