java - spring 集成 ExecutorChannel 的多订阅者

标签 java spring spring-integration

在我的应用程序中,我配置了一些 channel ,如下所示:

@Bean
public MessageChannel eventFilterChannel() {
    return new ExecutorChannel(asyncConfiguration.getAsyncExecutor());
}

@Bean
public MessageChannel processEventChannel() {
    return new ExecutorChannel(asyncConfiguration.getAsyncExecutor());
}

我正在使用 ExecutorChannel 并使用我的自定义 Executor,如下所示:

@Configuration  
@EnableAsync
public class AsyncConfiguration extends AsyncConfigurerSupport {

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(100);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("MyAppThread");
        executor.initialize();
        return executor;
    }
}

我有以下 MessageEndpoint,它是 eventFilterChannel channel 的订阅者:

@MessageEndpoint
public class MyEventFilter {

    @Filter(inputChannel = "eventFilterChannel", outputChannel = "processEventChannel")
    public boolean filterEvents(final MyEvent myEvent) {

            //filter logic

    }

}

理想情况下,我希望我的事件过滤器消息端点是多线程的,因为我使用的是ExecutorChannel。我想了解这是否是多线程端点的正确实现?

但是,我很怀疑,因为我可以在日志中看到以下内容:

Channel 'application.eventFilterChannel' has 1 subscriber(s). 

我的实现是否正确或者是否有我可以遵循的标准?

最佳答案

嗯,有一点误导。您的 eventFilterChannel 实际上只有一个订阅者 - 您的 @Filter。但它确实是多线程的。相同的无状态组件在多个线程中使用。

ExecutorChannel 对传入任务进行排队,并在池中的线​​程上并行执行它们。在我们的例子中,故事是关于消息传递的。不确定代码是否可以帮助您,但它看起来像:

public final boolean dispatch(final Message<?> message) {
    if (this.executor != null) {
        Runnable task = createMessageHandlingTask(message);
        this.executor.execute(task);
        return true;
    }
    return this.doDispatch(message);
}

其中Runnable是这样的:

public void run() {
    doDispatch(message);
}
...

handler.handleMessage(message);

这个处理程序正是该@Filter的订阅者。 因此,从不同的线程调用相同的方法。由于这是被动且无状态的组件,因此仅保留一次并从不同线程中重用是安全的。

另一方面,脱离主题:如果您向此 channel 添加更多订阅者,无论如何它们都不会被并行调用:默认情况下,它是循环策略:根据以下条件选择下一条消息的处理程序指数。 如果一个处理程序无法处理消息,我们将尝试下一个处理程序,依此类推。不过,您可以注入(inject)任何其他自定义实现。或者甚至将其重置为 null 以始终从第一个开始。

关于java - spring 集成 ExecutorChannel 的多订阅者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49384126/

相关文章:

java - 如何使用 Spring 和 Ldap 将登录用户重定向到主页

java - 如何根据配置动态创建多个jms消息驱动 channel 适配器

java - ImapIdleChannelAdapter javax.mail.AuthenticationFailedException : [ALERT] Too many simultaneous connections

java - Android 自定义 ListView 文本过滤器不起作用

java - 如何通过我的类(不是 Activity )访问我的数据库(使用上下文)

java - 如何在没有 spring @Component 或 @Bean 注释的情况下在我的项目中运行不同项目的 servlet-filter

spring-integration - 如何在Spring Cloud Stream中手动确认RabbitMQ消息?

java - Android:通过 URL.getContent() 从互联网获取数据 - 不需要 HttpUrlConnection?

java - 在Spring boot中创建外键-H2数据库

java - Spring无法使用javax.servlet编译jsp