java - ForkJoinPool 大小动态增加?

标签 java multithreading parallel-processing java-stream forkjoinpool

相关:CompletableFuture on ParallelStream gets batched and runs slower than sequential stream?
我正在研究通过 parallelStream 和 CompletableFutures 并行化网络调用的不同方式。因此,我遇到了这种情况,Java 的 parallelStream 使用的 ForkJoinPool.commonPool() 的大小动态增长,从 ~ #Cores 到最大值 64。
java 细节:$ java -version

openjdk version "11.0.10" 2021-01-19
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.10+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.10+9, mixed mode)
显示这种行为的代码如下(完整的可执行代码 here)

    public static int loops = 100;
    private static long sleepTimeMs = 1000;
    private static ExecutorService customPool = Executors.newFixedThreadPool(loops);




    // this method shows dynamic increase in pool size
    public static void m1() {
        Instant start = Instant.now();
        LongSummaryStatistics stats = LongStream.range(0, loops).boxed()
                .parallel()
                .map(number -> CompletableFuture.supplyAsync(
                        () -> DummyProcess.slowNetworkCall(number), customPool))
                .map(CompletableFuture::join)
                .mapToLong(Long::longValue)
                .summaryStatistics();

    }

    // this method shows static pool size
    public static void m2() {
        Instant start = Instant.now();
        LongSummaryStatistics stats = LongStream.range(0, loops)
                .parallel()
                .map(DummyProcess::slowNetworkCall) // in this call, parallelism/poolsize stays constant 11
                .summaryStatistics();
    }


    public static Long slowNetworkCall(Long i) {
        Instant start = Instant.now();
        // starts with 11 (#cores in my laptop = 12), goes upto 64
        log.info(" {} going to sleep. poolsize: {}", i, ForkJoinPool.commonPool().getPoolSize());
        try {
            TimeUnit.MILLISECONDS.sleep(sleepTimeMs);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info(" {} woke up..", i);
        return Duration.between(start, Instant.now()).toMillis();
    }

样本输出:
16:07:17.443 [pool-2-thread-7] INFO  generalworks.parallelism.DummyProcess -  44 going to sleep. poolsize: 11
16:07:17.443 [pool-2-thread-9] INFO  generalworks.parallelism.DummyProcess -  7 going to sleep. poolsize: 12
16:07:17.443 [pool-2-thread-4] INFO  generalworks.parallelism.DummyProcess -  6 going to sleep. poolsize: 12
16:07:17.444 [pool-2-thread-13] INFO  generalworks.parallelism.DummyProcess -  82 going to sleep. poolsize: 13
16:07:17.444 [pool-2-thread-14] INFO  generalworks.parallelism.DummyProcess -  26 going to sleep. poolsize: 14
16:07:17.444 [pool-2-thread-15] INFO  generalworks.parallelism.DummyProcess -  96 going to sleep. poolsize: 15
16:07:17.445 [pool-2-thread-16] INFO  generalworks.parallelism.DummyProcess -  78 going to sleep. poolsize: 16
.
.
16:07:18.460 [pool-2-thread-79] INFO  generalworks.parallelism.DummyProcess -  2 going to sleep. poolsize: 64
16:07:18.460 [pool-2-thread-71] INFO  generalworks.parallelism.DummyProcess -  36 going to sleep. poolsize: 64
16:07:18.460 [pool-2-thread-74] INFO  generalworks.parallelism.DummyProcess -  77 going to sleep. poolsize: 64
16:07:18.461 [pool-2-thread-83] INFO  generalworks.parallelism.DummyProcess -  86 going to sleep. poolsize: 64
我了解公共(public)池中的线程数,即 parallelism是基于可用内核的最大数量,所以由于我的笔记本电脑有 12 个内核,我开始时的并行度为 11。但我不明白为什么它在一种方法中不断攀升,但在另一种方法中,它的大小保持不变

最佳答案

我相信你的答案是 here (ForkJoinPool 实现):

                        if ((wt = q.owner) != null &&
                            ((ts = wt.getState()) == Thread.State.BLOCKED ||
                             ts == Thread.State.WAITING))
                            ++bc;            // worker is blocking
在您的代码的一个版本中,您阻止 Thread.sleep ,将线程放入 TIMED_WAITING状态,而在另一个状态下,您阻止 CompletableFuture.join() ,将其放入 WAITING状态。实现区分这些并展示您观察到的不同行为。CompletableFuture里面还有特例代码这使它与 ForkJoinPool 合作为了防止在等待结果时挨饿:
            if (Thread.currentThread() instanceof ForkJoinWorkerThread)
                ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
与您首先进行测试的原因相关的结论:Thread.sleep()不能正确模拟长时间的网络调用。如果您执行了实际操作或其他一些阻塞操作,它将通过扩展池来补偿。

关于java - ForkJoinPool 大小动态增加?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67416858/

相关文章:

java - Constructor中如何处理Writer、Result、OutputStream?

java - 对于存储在 Oracle 数据库中的数据运行搜索查询,在 PL/SQL 中使用 REGEXP 是否比获取所有数据并在 Java 正则表达式中过滤它更快?

java - 意外的电子机器 : 3 in KaliumJNI for Android

java - 存储 X509AttributeCertificateHolder 对象,尤其是 attrCert 本地并打开它

postgresql - 带有 postgres 插入/复制锁定和阻塞的并行聚合

C# While 在工作线程中循环监听命令和响应

.net - 在 .NET 中通过 Process.Start 生成的进程会挂起线程

java - 2 秒后第二个线程

python - 如何将 Python 和 OpenCV 与多处理一起使用?

Python 并行子进程命令同时抑制输出