java - 什么是 ParallelStream 队列行为?

标签 java multithreading forkjoinpool

我正在使用parallelStream并行上传一些文件,有些是大文件,有些是小文件。我注意到并非所有 worker 都被使用。

一开始一切都运行良好,所有线程都被使用(我将并行度选项设置为 16)。然后在某一时刻(一旦到达更大的文件),它只使用一个线程

简化代码:

files.parallelStream().forEach((file) -> {
    try (FileInputStream fileInputStream = new FileInputStream(file)) {
                IDocumentStorageAdaptor uploader = null;

                try {
                    logger.debug("Adaptors before taking: " + uploaderPool.size());
                    uploader = uploaderPool.take();
                    logger.debug("Took an adaptor!");
                    logger.debug("Adaptors after taking: " + uploaderPool.size());
                    uploader.addNewFile(file);
                } finally {
                    if (uploader != null) {
                        logger.debug("Adding one back!");
                        uploaderPool.put(uploader);
                        logger.debug("Adaptors after putting: " + uploaderPool.size());
                    }
                }
            } catch (InterruptedException | IOException e) {
                throw new UploadException(e);
            }
});

uploaderPool 是一个 ArrayBlockingQueue。 日志:

[ForkJoinPool.commonPool-worker-8] - Adaptors before taking: 0
[ForkJoinPool.commonPool-worker-15] - Adding one back!
[ForkJoinPool.commonPool-worker-8] - Took an adaptor!
[ForkJoinPool.commonPool-worker-15] - Adaptors after putting: 0
...
...
...
[ForkJoinPool.commonPool-worker-10] - Adding one back!
[ForkJoinPool.commonPool-worker-10] - Adaptors after putting: 16
[ForkJoinPool.commonPool-worker-10] - Adaptors before taking: 16
[ForkJoinPool.commonPool-worker-10] - Took an adaptor!
[ForkJoinPool.commonPool-worker-10] - Adaptors after taking: 15
[ForkJoinPool.commonPool-worker-10] - Adding one back!
[ForkJoinPool.commonPool-worker-10] - Adaptors after putting: 16
[ForkJoinPool.commonPool-worker-10] - Adaptors before taking: 16
[ForkJoinPool.commonPool-worker-10] - Took an adaptor!
[ForkJoinPool.commonPool-worker-10] - Adaptors after taking: 15

似乎所有工作(列表中的项目)都分布在 16 个线程中,委托(delegate)给一个线程的事情只会等到该线程可以自由工作,而不是使用可用的线程。有没有办法改变parallelStream的工作队列方式?我阅读了 forkjoinpool 文档,它提到了工作窃取,但仅限于生成的子任务。

我的另一个计划可能是随机化我正在使用并行流的列表的排序,也许这会平衡事情。

谢谢!

最佳答案

并行流的分割与计算启发式方法针对数据并行操作进行调整,而不是针对 IO 并行操作。 (换句话说,它们被调整为保持 CPU 繁忙,但不会生成比 CPU 多得多的任务。)因此,它们偏向于计算而不是 fork 。目前没有选项可以覆盖这些选择。

关于java - 什么是 ParallelStream 队列行为?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52172342/

相关文章:

java - 抽屉导航有奇怪的行为

java - 如何在 x 秒后处理 JOptionPane

python - 主程序结束时如何终止线程?

c# - 使用列表作为线程启动例程的参数时出现索引超出范围错误

java - 下面的递归任务的实现是否正确?

scalability - Akka-在负载测试期间,以20%的cpu时间扫描forkjoinpool.scan

java - ForkJoinTask 与 CompletableFuture

java - Java 中传递 ActionListener,pack()

java - Python 等效于 java PBKDF2WithHmacSHA1

java - 通过组合组合框中的值和序列计数来创建字符串