具有调用者工作窃取的自定义池的 Java parallelStream()?

标签 java java-8 java-stream fork-join forkjoinpool

通常当使用 Java 8 的 parallelStream() 时,结果是通过默认的、通用的 fork-join 池(即 ForkJoinPool.commonPool())执行。

这显然是不可取的,但是,如果一个人的工作远非 CPU 限制,例如可能大部分时间都在等待 IO。在这种情况下,人们会希望使用一个单独的池,其大小根据其他标准(例如,任务实际使用 CPU 的时间可能有多少)。

没有显而易见的方法让 parallelStream() 使用不同的池,但有一种方法,详述 here .

不幸的是,该方法需要从 fork-join 池线程调用并行流上的终端操作。这样做的缺点是,如果 target-fork 连接池完全忙于现有工作,整个执行将等待它,而什么都不做。因此,池可能成为比单线程执行更糟糕的瓶颈。相比之下,当以“正常”方式使用 parallelStream() 时,ForkJoinPool.common.externalHelpComplete() 或 ForkJoinPool.common.tryExternalUnpush() 用于让池外的调用线程帮助处理。

有没有人知道两者让 parallelStream() 使用非默认的 fork-join 池的方法并且有一个来自 fork-join 外部的调用线程pool 帮助处理这项工作(但不是 fork-join pool 的其余工作)?

最佳答案

您可以在池上使用 awaitQuiescence 来帮忙。但是,您无法选择要帮助的任务,它只会从池中取出下一个待处理的任务,因此,如果有更多待处理的任务,您可能会在完成自己的任务之前先执行这些任务。

ForkJoinPool forkJoinPool = new ForkJoinPool(1);
// make all threads busy:
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE));
// submit our task (may contain your stream operation)
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread());
// help out
while(!task.isDone()) // use zero timeout to execute one task only
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS);
System.out.println(Thread.currentThread()==task.get());

将打印true

鉴于

ForkJoinPool forkJoinPool = new ForkJoinPool(1);
// make all threads busy:
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE));
// overload:
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE));
// submit our task (may contain your stream operation)
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread());
// help out
while(!task.isDone())
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS);
System.out.println(Thread.currentThread()==task.get());

在尝试执行第二个阻塞任务时将永远挂起。

不过,它会让启动线程帮助处理池中的待处理任务,这将增加它自己的任务被执行的机会,只要没有无限任务(上面的例子是极端的,只是为了演示而选择的)。


但请注意,Fork/Join 框架和 Stream API 之间的整个关系无论如何都是一个实现细节。

关于具有调用者工作窃取的自定义池的 Java parallelStream()?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32891610/

相关文章:

java - 使用 Maven 创建控制台应用程序时最好的原型(prototype)是什么?

java - 通过使用 Java 8 流对其进行排序将集合转换为 Map

java - Java 流阶段是连续的吗?

java - 推断泛型类中的参数类型,这些参数类型是所提供参数的嵌套泛型类型

java - 使用 Java Stream 查找最大公约数,无需递归 while/for 循环

java-8 - java 8 流干扰与非干扰

java - 使用流从 map 中求和值并收集到列表

java - 将字节数组大小减少到原始大小的一半

java - 如果我没有将元素放入 Map 中,如何从 Map 中获取元素?

java - Gradle JAR 添加到类路径