java - 并发 SQS 队列监听器

标签 java spring spring-boot amazon-sqs

我不明白 SQS QueueListener 是如何工作的。

这是我的配置:

    /**
     * AWS Credentials Bean
     */
    @Bean
    public AWSCredentials awsCredentials() {
        return new BasicAWSCredentials(accessKey, secretAccessKey);
    }

    /**
     * AWS Client Bean
     */
    @Bean
    public AmazonSQS amazonSQSAsyncClient() {
        AmazonSQS sqsClient = new AmazonSQSClient(awsCredentials());
        sqsClient.setRegion(Region.getRegion(Regions.US_EAST_1));
        return sqsClient;
    }

    /**
     * AWS Connection Factory
     */
    @Bean
    public SQSConnectionFactory connectionFactory() {
        SQSConnectionFactory.Builder factoryBuilder = new SQSConnectionFactory.Builder(
                Region.getRegion(Regions.US_EAST_1));
        factoryBuilder.setAwsCredentialsProvider(new AWSCredentialsProvider() {

            @Override
            public AWSCredentials getCredentials() {
                return awsCredentials();
            }

            @Override
            public void refresh() {
            }

        });
        return factoryBuilder.build();
    }

    /**
     * Registering QueueListener for queueName
     */
    @Bean
    public DefaultMessageListenerContainer defaultMessageListenerContainer() {
        DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
        messageListenerContainer.setConnectionFactory(connectionFactory());
        messageListenerContainer.setDestinationName(queueName);
        messageListenerContainer.setMessageListener(new MessageListenerAdapter(new LabQueueListener()));
        messageListenerContainer.setErrorHandler(new QueueListenerErrorHandler());
        messageListenerContainer.setTaskExecutor(Executors.newFixedThreadPool(3));

        return messageListenerContainer;
    }

如您所见,我已经使用 Executors.newFixedThreadPool(3) 配置了我的 DefaultMessageListenerContainer

通过这种方式,我希望在我的队列监听器中同时执行 3 个并发任务。

这是我的听众逻辑:

public class QueueListener {

    public void handleMessage(String messageContent) {
        try {
            logger.info(String.format("message received: %s", messageContent));
            logger.info("wait 30 sec");
            Thread.sleep(1000 * 30);
            logger.info("done");
        } catch (Throwable th) {
            throw new QueueListenerException(messageContent, th);
        }
    }
}

现在每个 handleMessage 方法 block (Thread.sleep(1000 * 30);) 执行 30 秒,只有 1 个 handleMessage 方法一次执行。

我做错了什么? 如何实现一次并发调用handleMessage方法? 使用当前配置,我希望同时执行 3 个 handleMessage

最佳答案

您可以通过添加 messageListenerContainer.setConcurrency("3-10"); 在 DefaultMessageListenerConfigurator 的 bean 中添加参数来处理并发执行;这意味着它将从 3 个线程开始并扩展到 10 个.
concurrentConsumers 的数量也可以使用 messageListenerContainer.setConcurrentConsumers(3);

设置

引用:https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setConcurrency-java.lang.String-

关于java - 并发 SQS 队列监听器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32307428/

相关文章:

java - 启动 jnlp 文件时出错 : Could not create the Java Virtual Machine and A fatal exception has occurred. 程序将退出

java - 以 json 格式向客户端返回消息以及数据 - Java/Spring/REST

java - Stackdriver Spring Boot 缺少有效 key

maven - 在加特林模拟之前启动 spring-boot 应用程序

spring-boot - Spring 靴。 @DataJpaTest H2嵌入式数据库创建架构

java - 最好的 RESTful JavaScript API 是什么?

java - 如何将 Swagger 与 Maven + Java + Jersey +Tomcat 集成

java - 为什么忽略不相关方法上的 Autowired

java - 单元测试编译时织入

database - 为什么我的千分尺在应用程序启动后只执行一次?