我在网上搜索了各种文章和 Stack Overflow 问题,但找不到完美的答案。许多问题都与此接近,但略有不同。
We know Java 8 Streams API uses Fork-Join Pool internally.
现在我的问题是如何使用 Fork-Join 池划分流管道中的任务?
假设我们有以下内容:
List myList = inputList.parallelStream().filter( x -> x>0 )
.map(x -> x+100 ).collect(Collectors.toList());
现在我们有两种使用线程池划分任务的选择。
- 将
filter
和map
作为一个任务,使用fork-join pool运行。 - 将
filter
和map
作为两个不同的任务,并使用两个不同的fork-join 线程池运行它们。
另外我知道流是延迟传播的,所以如果我们在两者之间有一个有状态的中间操作:
List myList2 = inputList.parallelStream().filter( x -> x>0 )
.map(x -> x+5 ).sorted().map(x -> x+5 ).collect(Collectors.toList());
那么如何创建线程池呢?
PS:之前知道map函数可以组合。我只是想为这个问题举个例子。
最佳答案
首先,您必须使用parallel
来激活Fork-Join Pool
。 This answer稍微解释一下 Spliterator
是如何执行拆分的;但简单来说,拆分是使用流元素的来源完成的,整个管道是并行处理的。在您的示例中,它是 filter
and map
(当然它还包括 terminal
操作)。
对于有状态操作——事情要复杂得多。让我们以 distinct
为例,首先看看它如何处理顺序情况。
一般来说,您可能会认为非并行
distinct
可以使用HashSet
来实现——您是对的。 HashSet
可以保存所有已经看到的值,并且不处理(发送到下一个操作)其他元素 - 理论上你可以用非并行的方式完成distinct
操作。但是,如果已知 Stream
是 SORTED
怎么办?想一想,这意味着我们可以保留一个标记为 seen
的元素(与以前的 HashSet
不同)。基本上如果你有:
1,1,2,2,3
这意味着您的有状态操作可以在单个元素之上实现——而不是HashSet
;代码会是这样的:
T seen = null;
....
if(seen == null || (!currentElement.equals(seen)){
seen = currentElement;
// process seen;
}
但是这种优化只有在你知道流是SORTED
时才有可能,因为这样你就知道下一个元素要么与你已经看到的相同,要么是一个新元素,这对您来说是不可能在之前的其他操作中看到的——这是由排序操作保证的。
现在 parallel distinct
是如何实现的。你基本上会问这个问题:
Then how will the thread-pools be created
同样,从 Stream 的角度来看没有任何变化,ForkJoinPool
使用相同数量的线程 - 显然,唯一改变的是流实现。
简单来说,如果您的Stream
是ORDERED
,则内部实现使用LinkedHashSet
(实际上是它的多个实例,因为它确实在这种情况下减少)以保留您的订单,如果您不关心订单,它会使用 ConcurrentHashMap
- 如果源未订购(如 Set
) 或者你使用显式调用 unordered
。如果您真的想知道它是如何完成的,您也可以查看 sorted
的实现。
所以底线是 Fork Join Pool
不会更改基于流的实现,它使用相同的模型。另一方面,根据您拥有的操作,Stream API 可能会使用一些有状态数据来进行有状态中间操作,例如 HashSet/ConcurrentHashMap
或单个元素等。
关于java - 并行流中的 Fork-Join 池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52915530/