java - 为什么使用的线程数高于要求?

标签 java multithreading spring-boot threadpoolexecutor

我有一个 SpringBoot 应用程序,我允许最多 45 个并发请求。 现在,1 个请求 在其旅程中使用 threadPool A 并行调用 16 个外部服务。因此,请牢记平均情况和最坏情况,我一直遵循以下配置:

ThreadPoolTaskExecutor A = new ThreadPoolTaskExecutor();
A.setCorePoolSize(400);
A.setMaxPoolSize(1000);
A.setQueueCapacity(10);
A.setThreadNamePrefix("async-executor");
A.initialize();

我的预期是最多 45*16 = 720 个线程将被使用。但是在运行负载测试时,我观察到线程不断打开(在线程转储中检查),几分钟后它开始给出 RejectedExecutionException。

RejectedExecutionException
Task ServiceX rejected from org.springframework.scheduling.concurrent.
ThreadPoolTaskExecutor$1@4221a19e[Running, pool
size = 1000, active threads = 2, queued tasks = 10, completed tasks = 625216]

线程转储中显示的大多数线程

"executor-A-57" #579 prio=5 os_prio=0 tid=0x000000000193f800 nid=0x2e95 waiting on condition [0x00007fa9e820c000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000582dadf90> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
    - None

我想知道我在这里缺少什么?为什么我会被拒绝?

编辑: 我试图在一小段代码上复制类似的东西,这里是:

MainClass 运行一个长循环。在每个循环中,它调用 service1 3 次。现在我有演示服务,里面只有相同的代码 Thread.sleep(100)

主类.java

package com.flappy.everything.threadpooling;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class MainClass {

    private static ThreadPoolTaskExecutor getExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setMaxPoolSize(4);
        threadPoolTaskExecutor.setThreadNamePrefix("async-exec");
        threadPoolTaskExecutor.setCorePoolSize(4);
        threadPoolTaskExecutor.setQueueCapacity(2);
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolTaskExecutor outerExecutor = getExecutor();
        List<Service1> services = Arrays.asList(new Service1(), new Service1(), new Service1());
        for (int i = 0; i < 1000000; i++) {
            List<Future> futures = new ArrayList<>();
            for (Service1 service : services) {
                futures.add(outerExecutor.submit(() -> {
                    try {
                        service.set();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }));
            }
            for (Future future : futures) {
                future.get();
            }
        }
    }
}

服务1.java

package com.oyorooms.everything.threadpooling;

import org.springframework.scheduling.annotation.Async;

public class Service1 {
    public void set() throws InterruptedException {
        Thread.sleep(100);
        System.out.println(Thread.currentThread().getName());
    }
}

所以理想情况下,应该只为我提供的线程池打开 3 个线程,但我仍然在运行代码时遇到拒绝。

最佳答案

这很有趣。

您列出的代码失败的原因是因为将元素从工作队列传输到工作线程所花费的时间比主线程将项目放入队列所花费的时间慢。

流程是这样的:

if(there are active threads and is there availability on the queue){
    submit to the work queue for the worker threads to pick up // 1
} else {
   if(max pool size is not met){
      create a new thread with this task being its first task // 2
   } else { 
      reject // 3
   }
} 

您看到的是代码命中 //3

当您首次提交任务时,线程数将小于最大池大小。第一轮提交的任务将到达 //2

第一次迭代后, Activity 线程的数量将是最大池大小,代码将尝试提交到 //1

假设主线程非常非常快地将 3 个项目放入队列,因此 ThreadPool 中的 4 个线程无法足够快地取出一个。如果发生这种情况,我们将传递第一个 if 语句(因为队列中没有可用性)并转到 else。由于已达到最大池大小,因此除了reject 别无他法。

这可以通过检查 ThreadPoolExecutor Javadocs 来进一步解释。 .

If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

以后

Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed.

要解决您的问题,您有两个合理的选择:

  1. 使用 SynchronousQueue .提供给 SynchronousQueue 的线程将无限期地等待,直到另一个线程获取该项目(如果它知道另一个线程正在等待接收它)。您定义的固定队列大小将导致主线程在放置不成功时返回(不阻塞)(即,另一个线程不会立即将其取消)。要使用 Spring 使用 SynchronousQueue,请将队列容量设置为零。 设置队列容量(0)。 同样来自 Javadocs

    A good default choice for a work queue is a SynchronousQueue that hands off tasks to threads without otherwise holding them.

  2. 将队列大小设置为大于或等于您希望提交的并发任务数。队列的大小一般不会达到那个大小,但它会在将来保护你。

关于java - 为什么使用的线程数高于要求?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56431221/

相关文章:

java - 执行器 - 需要 LinkedBlockingQueue

java - 创建装饰器,如何获取eclipse插件中的文件信息?

java - 通过 SMTP 发送 ICS 文件无法在 Outlook 中正确显示?

java - 在 Java 下的 Saxon 中加载和验证 XML 文件和架构的示例代码

java - 在不知道属性名称和属性数量的情况下处理 json 对象

java - 使用 SwingWorker 在 GUI 中添加进度条

java - 在 ExecutorService 的 newFixedThreadPool() 中使用队列?

Java线程: Running a simple java thread program output confusion

Spring Integration 在聚合后调用另一个处理程序方法

java - RestTemplate 和 ResponseErrorHandler : Elegant means of handling errors given an indeterminate return object