java - 为什么 ThreadPoolExecutor 会在 keepAliveTime 之后将线程减少到 corePoolSize 以下?

标签 java multithreading threadpool java-6 keep-alive

我一直在研究使用 ThreadPoolExecutor 和 JDK6 进行线程池的不同策略。我有一个优先级队列在工作,但不确定我是否喜欢在 keepAliveTime 之后池没有调整大小的方式(无界队列得到的结果)。因此,我正在查看使用 LinkedBlockingQueue 和 CallerRuns 策略的 ThreadPoolExecutor。

我现在遇到的问题是池增加,正如文档所解释的那样,但是在任务完成并且 keepAliveTime 开始运行后,getPoolSize 显示池减少到零。下面的示例代码应该让您了解我的问题的基础:

public class ThreadPoolingDemo {
    private final static Logger LOGGER =
         Logger.getLogger(ThreadPoolingDemo.class.getName());

    public static void main(String[] args) throws Exception {
        LOGGER.info("MAIN THREAD:starting");
        runCallerTestPlain();   
    }

    private static void runCallerTestPlain() throws InterruptedException {
        //10 core threads, 
        //50 max pool size, 
        //100 tasks in queue, 
        //at max pool and full queue - caller runs task
        ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 50,
            5L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100),
            new ThreadPoolExecutor.CallerRunsPolicy());

        //dump 5000 tasks on the queue
        for (int i = 0; i < 5000; i++) {
            tpe.submit(new Runnable() {
                @Override
                public void run() {
                    //just to eat some time and give a little feedback
                    for (int j = 0; j < 20; j++) {
                        LOGGER.info("First-batch Task, looping:" + j + "["
                               + Thread.currentThread().getId() + "]");
                    }
                }
            }, null);
        }
        LOGGER.info("MAIN THREAD:!!Done queueing!!");

        //check tpe statistics forever
        while (true) {
            LOGGER.info("Active count: " + tpe.getActiveCount() + " Pool size: "
                 + tpe.getPoolSize() + " Largest Pool: " + tpe.getLargestPoolSize());
            Thread.sleep(1000);
        }
    }
}

我发现了一个似乎是这个问题但已关闭的旧错误:http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6458662 .这是否仍然存在于 1.6 中,还是我遗漏了什么?

看起来我 Rubber Ducked 这个 ( http://www.codinghorror.com/blog/2012/03/rubber-duck-problem-solving.html )。我上面链接的错误与此相关:http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6576792 ,问题似乎在 1.7 中得到解决(我加载了 1.7 并验证 - 已修复......)。我想我的主要问题是这个基本问题存在了将近十年的错误。我花了太多时间写这篇文章,现在没有发布,希望它能帮助别人。

最佳答案

... after the tasks complete and the keepAliveTime comes into play getPoolSize shows the pool getting reduced to zero.

所以这看起来是 ThreadPoolExecutor 中的竞争条件。我猜它正在按照设计工作,尽管不是预期的。在工作线程循环从阻塞队列中获取任务的 getTask() 方法中,您会看到以下代码:

if (state == SHUTDOWN)  // Help drain queue
    r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
    r = workQueue.take();
if (r != null)
    return r;
if (workerCanExit()) {
    if (runState >= SHUTDOWN) // Wake up others
        interruptIdleWorkers();
    return null;
}

如果 poolSize 增长到 corePoolSize 以上,那么如果轮询在 keepAliveTime 之后超时,代码将下降到 workerCanExit( ) 因为 rnull。所有线程都可以从该方法返回 true,因为它只是测试 poolSize 的状态:

    mainLock.lock();
    boolean canExit;
    try {
        canExit = runState >= STOP ||
            workQueue.isEmpty() ||
            (allowCoreThreadTimeOut &&
             poolSize > Math.max(1, corePoolSize)); << test poolSize here
    } finally {
        mainLock.unlock();                         << race to workerDone() begins
    }

一旦返回 true,工作线程就会退出,然后 poolSize 会递减。如果所有工作线程同时进行该测试,那么它们将全部退出,因为 poolSize 的测试与 --poolSize 时工作线程停止之间的竞争> 发生。

令我吃惊的是竞争条件的一致性。如果您在下面的 run() 中的 sleep() 中添加一些随机化,那么您可以让一些核心线程不退出,但我认为竞争条件会更难被击中。


您可以在以下测试中看到此行为:

@Test
public void test() throws Exception {
    int before = Thread.activeCount();
    int core = 10;
    int max = 50;
    int queueSize = 100;
    ThreadPoolExecutor tpe =
            new ThreadPoolExecutor(core, max, 1L, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(queueSize),
                    new ThreadPoolExecutor.CallerRunsPolicy());
    tpe.allowCoreThreadTimeOut(false);
    assertEquals(0, tpe.getActiveCount());
    // if we start 1 more than can go into core or queue, poolSize goes to 0
    int startN = core + queueSize + 1;
    // if we only start jobs the core can take care of, then it won't go to 0
    // int startN = core + queueSize;
    for (int i = 0; i < startN; i++) {
        tpe.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    while (true) {
        System.out.println("active = " + tpe.getActiveCount() + ", poolSize = " + tpe.getPoolSize()
                + ", largest = " + tpe.getLargestPoolSize() + ", threads = " + (Thread.activeCount() - before));
        Thread.sleep(1000);
    }
}

如果您将 run() 方法中的 sleep 行更改为如下所示:

private final Random random = new Random();
...
    Thread.sleep(100 + random.nextInt(100));

这将使竞争条件更难命中,因此一些核心线程仍将存在。

关于java - 为什么 ThreadPoolExecutor 会在 keepAliveTime 之后将线程减少到 corePoolSize 以下?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9912372/

相关文章:

没有同步的C++多线程

java - 双重检查锁定作为反模式

java - newFixedThreadPool() 与 newCachedThreadPool()

java - java中的ThreadGroup相对于创建单独的线程有什么好处?

c# - 如何打开多个连接来下载单个文件?

c# - 使用信号量进行线程化

java - 使用数组将随机选择的图像添加到可单击的 GridView

java - 显示错误值的 Int 变量 - Android

java - 如何获取和设置纹理中的像素(LibGDX)

java - 如何从以 latin1 编码的结果集中以 UTF-8 编码字符串