Java 不使用所有可用的 CPU

标签 java multithreading concurrency java-8 fork-join

我有一个长时间运行的计算,我需要对一长串输入进行计算。计算是独立的,所以我想把它们分发到几个 CPU 上。我正在使用 Java 8。

代码的框架是这样的:

ExecutorService executorService = Executors.newFixedThreadPool(numThreads);

MyService myService = new MyService(executorService);

List<MyResult> results =
            myInputList.stream()
                     .map(myService::getResultFuture)
                     .map(CompletableFuture::join)
                     .collect(Collectors.toList());

executorService.shutdown();

负责计算的主要函数如下所示:

CompletableFuture<MyResult> getResultFuture(MyInput input) {
    return CompletableFuture.supplyAsync(() -> longCalc(input), executor)))
}

长时间运行的计算是无状态的,不做任何 IO。

我希望此代码使用所有可用的 CPU,但它并没有发生。例如,在具有 72 个 CPU 和 numThreads=72(甚至例如 numThreads=500)的机器上,cpu 使用率最多为 500-1000%,如下所示顶层:

htop

根据线程转储,许多计算线程正在等待,即:

"pool-1-thread-34" #55 prio=5 os_prio=0 tid=0x00007fe858597890 nid=0xd66 waiting on condition [0x00007fe7f9cdd000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000381815f20> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
    - None

所有计算线程都在等待同一个锁。在转储时,只有 5 个计算线程是 RUNNABLE,其余的都是 WAITING。

锁定的原因是什么?为什么我无法使用所有 CPU?

最佳答案

您正在提交作业并立即调用 join(),等待异步作业完成。

Stream 中间步骤是按元素执行的,这意味着中间步骤 .map(CompletableFuture::join) 一次在一个元素上运行(更糟糕的是它是一个顺序流),无需确保所有元素都已通过提交步骤。这会导致线程在等待每个计算完成时阻塞。

在开始对它们调用 join() 之前,您必须强制提交所有作业:

List<MyResult> results =
    myInputList.stream()
               .map(myService::getResultFuture)
               .collect(Collectors.toList()).stream()
               .map(CompletableFuture::join)
               .collect(Collectors.toList());

如果你可以将你想对 results 列表做的任何事情表达为当一切都完成时调用的 Action ,你可以用 以不阻塞线程的方式实现操作>加入():

List<CompletableFuture<MyResult>> futures = myInputList.stream()
    .map(myService::getResultFuture)
    .collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(CompletableFuture<?>[]::new))
    .thenRun(() -> {
        List<MyResult> results = futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
        // perform action with results
    });

它仍然调用 join() 来检索结果,但是此时,所有 future 都已完成,因此调用者不会被阻塞。

关于Java 不使用所有可用的 CPU,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49819369/

相关文章:

java - 网络适​​配器无法建立连接

Java Web 服务请求-响应问题

c - C程序中线程内的多线程

c# - OpenTK 使用二次幂位图纹理将文本写入屏幕

Go range over channel 死锁问题,我应该关闭 channel 吗?

java - 为什么我的应用程序在我单击按钮后就会关闭?

java - 如何在不渲染所有内容的情况下使用 j2html

c++ - 使用来自 CreateFile 的有效句柄来自 ReadFileEx 的无效句柄错误

.net - MS Access数据库的自动压缩和修复

java - `this` 引用外部类如何通过发布内部类实例转义?