spring - DefaultMessageListenerContainer 停止处理消息

标签 spring amazon-web-services spring-cloud amazon-sqs spring-jms

我希望这是一个简单的配置问题,但我似乎无法弄清楚它可能是什么。

设置

  • Spring-Boor 2.2.2.RELEASE
  • 云启动器
  • 云启动器-aws
  • spring-jms
  • spring-cloud-dependencyHoxton.SR1
  • amazon-sqs-java-messaging-lib 1.0.8

问题

我的应用程序启动正常并开始处理来自 Amazon SQS 的消息。一段时间后,我看到以下警告

2020-02-01 04:16:21.482 LogLevel=WARN 1 --- [ecutor-thread14] o.s.j.l.DefaultMessageListenerContainer : Number of scheduled consumers has dropped below concurrentConsumers limit, probably due to tasks having been rejected. Check your thread pool configuration! Automatic recovery to be triggered by remaining consumers.

上面的警告被打印多次,最终我看到以下两条INFO消息

2020-02-01 04:17:51.552 LogLevel=INFO 1 --- [ecutor-thread40] c.a.s.javamessaging.SQSMessageConsumer : Shutting down ConsumerPrefetch executor

2020-02-01 04:18:06.640 LogLevel=INFO 1 --- [ecutor-thread40] com.amazon.sqs.javamessaging.SQSSession : Shutting down SessionCallBackScheduler executor

以上 2 条消息将显示多次,并且在某个时刻,SQS 不再使用任何消息。我在日志中没有看到任何其他消息表明存在问题,但我没有从处理程序收到任何消息表明它们正在处理消息(我有 2 个~),并且我可以看到 AWS SQS 队列的消息数量在增长,并且年龄。

~:当我有一个处理程序时,这个确切的代码工作正常,当我添加第二个处理程序时,这个问题就开始了。

配置/代码

我意识到的第一个“警告”是由 ThreadPoolTask​​Executor 的货币引起的,但我无法获得正常工作的配置。这是我当前的 JMS 配置,我尝试了各种级别的最大池大小,除了根据池大小迟早启动警告之外没有任何实际影响

    public ThreadPoolTaskExecutor asyncAppConsumerTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setThreadGroupName("asyncConsumerTaskExecutor");
        taskExecutor.setThreadNamePrefix("asyncConsumerTaskExecutor-thread");
        taskExecutor.setCorePoolSize(10);
        // Allow the thread pool to grow up to 4 times the core size, evidently not
        // having the pool be larger than the max concurrency causes the JMS queue
        // to barf on itself with messages like
        // "Number of scheduled consumers has dropped below concurrentConsumers limit, probably due to tasks having been rejected. Check your thread pool configuration! Automatic recovery to be triggered by remaining consumers"
        taskExecutor.setMaxPoolSize(10 * 4);
        taskExecutor.setQueueCapacity(0); // do not queue up messages
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        return taskExecutor;
    }

这是我们创建的JMS容器工厂

    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(SQSConnectionFactory sqsConnectionFactory, ThreadPoolTaskExecutor asyncConsumerTaskExecutor) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(sqsConnectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        // The JMS processor will start 'concurrency' number of tasks
        // and supposedly will increase this to the max of '10 * 3'
        factory.setConcurrency(10 + "-" + (10 * 3));
        factory.setTaskExecutor(asyncConsumerTaskExecutor);
        // Let the task process 100 messages, default appears to be 10
        factory.setMaxMessagesPerTask(100);
        // Wait up to 5 seconds for a timeout, this keeps the task around a bit longer
        factory.setReceiveTimeout(5000L);
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        return factory;
    }

我根据在互联网上找到的内容添加了 setMaxMessagesPerTasksetReceiveTimeout 调用,如果没有这些调用,并且在各种设置(50、2500L、25、1000L、等等...)

我们创建一个默认的 SQS 连接工厂

    public SQSConnectionFactory sqsConnectionFactory(AmazonSQS amazonSQS) {
        return new SQSConnectionFactory(new ProviderConfiguration(), amazonSQS);
    }

最后处理程序看起来像这样

    @JmsListener(destination = "consumer-event-queue")
    public void receiveEvents(String message) throws IOException {
        MyEventDTO myEventDTO = jsonObj.readValue(message, MyEventDTO.class);
        //messageTask.process(myEventDTO);
    }

    @JmsListener(destination = "myalert-sqs")
    public void receiveAlerts(String message) throws IOException, InterruptedException {
        final MyAlertDTO myAlert = jsonObj.readValue(message, MyAlertDTO.class);
        myProcessor.addAlertToQueue(myAlert);
    }

您可以看到,在第一个函数(receiveEvents)中,我们只是从队列中取出消息并退出,我们还没有为此实现处理代码。 第二个函数 (receiveAlerts) 获取消息,myProcessor.addAlertToQueue 函数创建一个可运行对象并将其提交到线程池进行处理在未来的某个时刻。

只有当我们添加receiveAlerts函数时,问题才开始(警告、信息和消费消息失败),之前只有另一个函数存在,我们没有看到这种行为.

更多

这是一个较大项目的一部分,我正在努力将此代码分解为一个较小的测试用例,看看是否可以重复此问题。我将发布后续结果。

同时

我希望这只是一个配置问题,更熟悉此问题的人可以告诉我我做错了什么,或者有人可以提供一些关于如何纠正此问题以使其正常工作的想法和评论。

谢谢!

最佳答案

在与这个问题斗争了一段时间后,我想我终于解决了它。

该问题似乎是由“DefaultJmsListenerContainerFactory”引起的,该工厂使用“@JmsListener”注释为 EACH 方法创建了一个新的“DefaultJmsListenerContainer”。最初编写代码的人认为它只为应用程序调用一次,创建的容器将被重复使用。所以问题有两个方面

  1. 附加到工厂的“ThreadPoolTask​​Executor”有 40 个线程,当应用程序有 1 个“@JmsListener”方法时,效果很好,但是当我们添加第二个方法时,每个方法都有 10 个线程(总共 20 个)用于监听。不过,这很好;由于我们声明每个监听器最多可以增长 30 个监听器,因此我们很快就耗尽了上面 1 中提到的池中的线程。这导致了“计划消费者数量已降至并发消费者限制以下”错误
  2. 考虑到上述情况,这可能是显而易见的,但我想明确指出。然而,在监听器工厂中,我们将并发设置为“10-30”;所有听众都必须共享该池。因此,必须设置最大并发数,以便每个监听器的最大值足够小,这样如果每个监听器创建其最大值,它就不会超过池中的最大线程数(例如,如果我们有 2 个“@JmsListener” ' 带注释的方法和具有 40 个线程的池,则最大值不能超过 20)。

希望这可以帮助将来遇到类似问题的其他人......

关于spring - DefaultMessageListenerContainer 停止处理消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60042938/

相关文章:

java - 在 Spring DTO 中发布包含可变大小数组的 JSON 对象时要使用的字段?

spring - Spring是否兼容无服务器计算

java - 默认 Spring Autowiring

python - 从 Python 调用 AWS Rekognition HTTP API 的示例

amazon-web-services - 具有大量数据的 ec2 上的最佳实践 cassandra 设置

docker - 如何让Eureka客户端使用主机的IP而不是Docker容器的IP?

spring - 尝试使用本地 Spring Cloud Connector/local_configuration_connector 时找不到合适的云连接器

java - IClientConfig 参数有什么用?

java - 为什么 Spring Boot 不读取我的接口(interface)资源上定义的 PathVariable?

mongodb - StitchServiceError  "aws: "aws_service“是必需的字符串”,errorCodeName : InvalidParameter