java - 并行流中的 Fork-Join 池

标签 java multithreading java-8 java-stream

我在网上搜索了各种文章和 Stack Overflow 问题,但找不到完美的答案。许多问题都与此接近,但略有不同。

We know Java 8 Streams API uses Fork-Join Pool internally.

现在我的问题是如何使用 Fork-Join 池划分流管道中的任务?

假设我们有以下内容:

List myList =  inputList.parallelStream().filter( x -> x>0 )
    .map(x -> x+100 ).collect(Collectors.toList());

现在我们有两种使用线程池划分任务的选择。

  1. filtermap作为一个任务,使用fork-join pool运行。
  2. filtermap 作为两个不同的任务,并使用两个不同的fork-join 线程池运行它们。

另外我知道流是延迟传播的,所以如果我们在两者之间有一个有状态的中间操作:

List myList2 = inputList.parallelStream().filter( x -> x>0 )
    .map(x -> x+5 ).sorted().map(x -> x+5 ).collect(Collectors.toList());

那么如何创建线程池呢?

PS:之前知道map函数可以组合。我只是想为这个问题举个例子。

最佳答案

首先,您必须使用parallel 来激活Fork-Join PoolThis answer稍微解释一下 Spliterator 是如何执行拆分的;但简单来说,拆分是使用流元素的来源完成的,整个管道是并行处理的。在您的示例中,它是 filter and map (当然它还包括 terminal 操作)。

对于有状态操作——事情要复杂得多。让我们以 distinct 为例,首先看看它如何处理顺序情况。

一般来说,您可能会认为非并行 distinct 可以使用HashSet 来实现——您是对的。 HashSet 可以保存所有已经看到的值,并且不处理(发送到下一个操作)其他元素 - 理论上你可以用非并行的方式完成distinct 操作。但是,如果已知 StreamSORTED 怎么办?想一想,这意味着我们可以保留一个标记为 seen 的元素(与以前的 HashSet 不同)。基本上如果你有:

 1,1,2,2,3

这意味着您的有状态操作可以在单个元素之上实现——而不是HashSet;代码会是这样的:

T seen = null;
....
if(seen == null || (!currentElement.equals(seen)){
    seen = currentElement;
    // process seen;
}

但是这种优化只有在你知道流是SORTED时才有可能,因为这样你就知道下一个元素要么与你已经看到的相同,要么是一个新元素,这对您来说是不可能在之前的其他操作中看到的——这是由排序操作保证的。

现在 parallel distinct 是如何实现的。你基本上会问这个问题:

Then how will the thread-pools be created

同样,从 Stream 的角度来看没有任何变化,ForkJoinPool 使用相同数量的线程 - 显然,唯一改变的是流实现。

简单来说,如果您的StreamORDERED,则内部实现使用LinkedHashSet(实际上是它的多个实例,因为它确实在这种情况下减少)以保留您的订单,如果您不关心订单,它会使用 ConcurrentHashMap - 如果源未订购(如 Set) 或者你使用显式调用 unordered。如果您真的想知道它是如何完成的,您也可以查看 sorted 的实现。


所以底线是 Fork Join Pool 不会更改基于流的实现,它使用相同的模型。另一方面,根据您拥有的操作,Stream API 可能会使用一些有状态数据来进行有状态中间操作,例如 HashSet/ConcurrentHashMap 或单个元素等。

关于java - 并行流中的 Fork-Join 池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52915530/

相关文章:

java - 使用扫描仪和分隔符

java - 与 Thread.sleep 相比,new Thread().sleep 在 CPU 和内存利用率方面有多差?

java - 使用两个线程一次打印一个字母和数字

java-8 - 从 map 创建 map

java - 如何使用log4j2记录log4j日志

java - 红外遥控应用程序如何工作?

multithreading - Rust:使用互斥体允许从多个线程访问数据

Java 8 列表到嵌套映射

Java 8 方法引用 : validation of methods at compile time

Java 方法 split() 不适用于逗号,数组越界