上下文
我偶然发现了一个相当烦人的问题:我有一个程序,它有很多数据源,能够流式传输相同类型的元素,我想“映射”程序中的每个可用元素(元素顺序没关系)。
因此我尝试减少我的 Stream<Stream<T>> streamOfStreamOfT;
变成一个简单的Stream<T> streamOfT;
使用 streamOfT = streamOfStreamOfT.reduce(Stream.empty(), Stream::concat);
因为元素顺序对我来说并不重要,所以我尝试使用 .parallel()
并行化 reduce 操作。 : streamOfT = streamOfStreamOfT.parallel().reduce(Stream.empty(), Stream::concat);
但这会触发 java.lang.IllegalStateException: stream has already been operated upon or closed
例子
要亲自体验,只需通过评论/取消评论 .parallel()
来玩以下主要 (java 1.8u20)
public static void main(String[] args) {
// GIVEN
List<Stream<Integer>> listOfStreamOfInts = new ArrayList<>();
for (int j = 0; j < 10; j++) {
IntStream intStreamOf10Ints = IntStream.iterate(0, i -> i + 1)
.limit(10);
Stream<Integer> genericStreamOf10Ints = StreamSupport.stream(
intStreamOf10Ints.spliterator(), true);
listOfStreamOfInts.add(genericStreamOf10Ints);
}
Stream<Stream<Integer>> streamOfStreamOfInts = listOfStreamOfInts
.stream();
// WHEN
Stream<Integer> streamOfInts = streamOfStreamOfInts
// ////////////////
// PROBLEM
// |
// V
.parallel()
.reduce(Stream.empty(), Stream::concat);
// THEN
System.out.println(streamOfInts.map(String::valueOf).collect(
joining(", ")));
}
问题
有人可以解释这个限制吗? /找到一种更好的方法来处理流的并行减少
编辑 1
在@Smutje 和@LouisWasserman 评论之后,似乎 .flatMap(Function.identity())
是容忍 .parallel()
的更好选择流
最佳答案
您使用的reduce
形式采用标识值 和关联组合函数。但是 Stream.empty()
不是一个值;它有状态。流不是数组或集合之类的数据结构;它们是通过可能并行的聚合操作推送数据的载体,并且它们具有某种状态(例如流是否已被消耗)。想想这是如何工作的;您将构建一棵树,其中相同的“空”流出现在多个叶子中。当您尝试两次使用此有状态的非身份(不会按顺序发生,但会并行发生)时,您第二次尝试遍历该空流时,将非常正确地看到它已被使用.
所以问题是,您只是错误地使用了这个 reduce
方法。问题不在于并行性;只是并行性暴露了潜在的问题。
其次,即使这按照您认为应该的方式“工作”,您也只能并行构建表示扁平流的树;当您进行连接时,那里是一个顺序流管道。糟糕。
第三,即使这按照您认为应该的方式“工作”,您将通过构建串联流来增加大量元素访问开销,并且您不会获得并行性的好处你在寻找。
简单的答案是展平流:
String joined = streamOfStreams.parallel()
.flatMap(s -> s)
.collect(joining(", "));
关于java - 为什么不能并行减少流流?/流已经被操作或关闭,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25412377/