假设我有这段代码:
Collections.singletonList(10)
.parallelStream() // .stream() - nothing changes
.flatMap(x -> Stream.iterate(0, i -> i + 1)
.limit(x)
.parallel()
.peek(m -> {
System.out.println(Thread.currentThread().getName());
}))
.collect(Collectors.toSet());
输出是相同的线程名称,因此这里的并行没有任何好处 - 我的意思是有一个线程可以完成所有工作。
在flatMap
中有这样的代码:
result.sequential().forEach(downstream);
我理解如果“外部”流是并行的(它们可能会阻塞),则强制使用“顺序”属性,“外部”必须等待“flatMap”完成,反之亦然(因为使用相同的公共(public)池)但是为什么总是强制这样做?
这是在以后的版本中可以改变的事情之一吗?
最佳答案
有两个不同的方面。
首先,只有一个管道,它要么是顺序的,要么是并行的。内部流的顺序或并行选择是无关紧要的。请注意,您在引用的代码片段中看到的 downstream
消费者代表整个后续流管道,因此在以 .collect(Collectors.toSet());
结尾的代码中,该消费者最终会将结果元素添加到非线程安全的单个 Set
实例中。因此,与单个消费者并行处理内部流会破坏整个操作。
如果外部流被分割,引用的代码可能会被添加到不同集合的不同使用者同时调用。这些调用中的每一个都会处理映射到不同内部流实例的外部流的不同元素。由于您的外部流仅由单个元素组成,因此无法拆分。
这个已经实现的方式,也是Why filter() after flatMap() is “not completely” lazy in Java streams?的原因问题,因为 forEach
在内部流上被调用,它将把所有元素传递给下游消费者。如 this answer 所示,支持惰性和子流分割的替代实现是可能的。但这是一种根本不同的实现方式。 Stream 实现的当前设计主要通过消费者组合来工作,因此最终,源分割器(以及从中分割出来的那些)在 tryAdvance
或 forEachRemaining
中接收代表整个流管道的 Consumer
。相反,链接答案的解决方案进行了 spliterator 组合,生成一个新的 Spliterator
委托(delegate)给源 spliterator。我想,这两种方法都有优点,但我不确定,如果反过来的话,OpenJDK 实现会损失多少。
关于java - 并行的 flatMap 总是顺序的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49860499/