java - 使用 Executor 和 WebSocket 的多线程

标签 java multithreading spring websocket spring-annotations

就我而言,我想创建多个线程执行器来处理收到的陷阱。在同一个应用程序中,我想实现 websocket 以使我的应用程序实时运行。

我有配置类来创建 ThreadPoolExecutor,如下所示

@Configuration
@EnableAsync
@EnableScheduling
@Profile("!" + Constants.SPRING_PROFILE_FAST)
public class AsyncConfiguration implements AsyncConfigurer, EnvironmentAware {
 .......
 @Override
    @Bean
    public Executor getAsyncExecutor() {
        log.debug("Creating Async Task Executor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(propertyResolver.getProperty("corePoolSize", Integer.class, 30));
        executor.setMaxPoolSize(propertyResolver.getProperty("maxPoolSize", Integer.class, 150));
        executor.setQueueCapacity(propertyResolver.getProperty("queueCapacity", Integer.class, 10000));
        executor.setThreadNamePrefix("ems-Executor-");
        return new ExceptionHandlingAsyncTaskExecutor(executor);
    }

然后我在 TrapReceiver 类中使用执行器,

@Component
public class TrapReceiver extends Thread implements CommandResponder {

    @Inject
    private ApplicationContext applicationContext;

    @Inject
    private Executor executor;

    public TrapReceiver(){
    }

    List<PDUv1> listPdu = new ArrayList<PDUv1>();
    String message = "";
    long totReceivedTrap = 0;

    @PostConstruct
    public void init() {
        //create thread pool untuk memanage thread puller (thread yang pull dan save device oid value)
        System.out.println("Running trap listener");
        this.start();
    }

    public synchronized void processPdu(CommandResponderEvent cmdRespEvent) {
        PDUv1 pdu = (PDUv1) cmdRespEvent.getPDU();
        listPdu.add(pdu);
        if (pdu != null) {
            if(listPdu.size() == 3){ //3trap per thread
                List<PDUv1> temp = new ArrayList<PDUv1>();
                temp.addAll(listPdu);
                TrapInsertor trapInsertor = (TrapInsertor) applicationContext.getBean("trapInsertor");
                trapInsertor.setProperty(temp);
                executor.execute(trapInsertor);
                listPdu.clear();
            }
        }
        totReceivedTrap++;
        if(totReceivedTrap % 10000 == 0)
            System.out.println("total received trap "+totReceivedTrap);
    }

    public void run() {
        while (true) {
            try {
                this.listen(new UdpAddress(getIp()+"/162")); //where to listen
            } catch (Exception e) {
                e.printStackTrace();
            } 
        }
    }

这段代码运行良好,但是当我尝试在我的应用程序中添加 websocket 功能时,应用程序出现错误。当我使用 @EnableWebSocketMessageBroker 注释时出现错误。这是我的 websocket 配置

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketAppConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }

这就是错误

Caused by: org.springframework.beans.factory.BeanCreationException: Could not autowire field: private java.util.concurrent.Executor com.satunol.ems.snmp.TrapReceiver.executor; nested exception is org.springframework.beans.factory.NoUniqueBeanDefinitionException: No qualifying bean of type [java.util.concurrent.Executor] is defined: expected single matching bean but found 5: getAsyncExecutor,messageBrokerSockJsTaskScheduler,clientInboundChannelExecutor,clientOutboundChannelExecutor,brokerChannelExecutor
    at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAnnotationBeanPostProcessor.java:555)
    at org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:87)
    at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessPropertyValues(AutowiredAnnotationBeanPostProcessor.java:331)
    ... 16 more
Caused by: org.springframework.beans.factory.NoUniqueBeanDefinitionException: No qualifying bean of type [java.util.concurrent.Executor] is defined: expected single matching bean but found 5: getAsyncExecutor,messageBrokerSockJsTaskScheduler,clientInboundChannelExecutor

我的代码有什么问题,如果我在 websocket 配置或线程池中出错,应该如何。

最佳答案

这里的问题是,由于 [java.util.concurrent.Executor] 有多个可能的 bean,Spring 无法选择正确的一个。 Spring Autowiring 机制默认基于类型。 由于 WebSocket 有自己的 ThreadPoolTask​​Executor 实现,因此您最终得到了 5 个可能的 bean。

Autowiring 机制基于这样的假设:您将提供一个没有任何其他注释的匹配 bean。由于有多个匹配的 bean,因此您需要告诉 Spring 您想要 Autowiring 哪一个。 可以通过注解@Qualifier("beanName")来实现

@Autowired
@Qualifier("getAsyncExecutor")
private Executor executor;

希望这有帮助!

一些例子: Example of @Qualifier annotation

关于java - 使用 Executor 和 WebSocket 的多线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30653028/

相关文章:

java - 更改实例化对象的值

java - 在 spring 中使用注解时出现错误

java - 创建包含用户特定数据的 WebSocket session 的最安全方法

c - pthread编程,线程不会同时运行

python - pygtk 应用程序中的单独线程

html - 如何将默认值添加到 Thymeleaf 中的输入字段

java - 无法解析对 bean 'cacheManager' 的引用

java - 停止从 readLine() 方法中读取数据的线程

java - java中jpa一对多数据插入错误

java - 我怎样才能加快 Spring 的形式 :options tag?