java - 为什么不能并行减少流流?/流已经被操作或关闭

标签 java parallel-processing java-8 java-stream

上下文

我偶然发现了一个相当烦人的问题:我有一个程序,它有很多数据源,能够流式传输相同类型的元素,我想“映射”程序中的每个可用元素(元素顺序没关系)。

因此我尝试减少我的 Stream<Stream<T>> streamOfStreamOfT;变成一个简单的Stream<T> streamOfT;使用 streamOfT = streamOfStreamOfT.reduce(Stream.empty(), Stream::concat);

因为元素顺序对我来说并不重要,所以我尝试使用 .parallel() 并行化 reduce 操作。 : streamOfT = streamOfStreamOfT.parallel().reduce(Stream.empty(), Stream::concat);但这会触发 java.lang.IllegalStateException: stream has already been operated upon or closed

例子

要亲自体验,只需通过评论/取消评论 .parallel() 来玩以下主要 (java 1.8u20)

public static void main(String[] args) {
    // GIVEN
    List<Stream<Integer>> listOfStreamOfInts = new ArrayList<>();
    for (int j = 0; j < 10; j++) {
        IntStream intStreamOf10Ints = IntStream.iterate(0, i -> i + 1)
                .limit(10);
        Stream<Integer> genericStreamOf10Ints = StreamSupport.stream(
                intStreamOf10Ints.spliterator(), true);
        listOfStreamOfInts.add(genericStreamOf10Ints);
    }
    Stream<Stream<Integer>> streamOfStreamOfInts = listOfStreamOfInts
            .stream();
    // WHEN
    Stream<Integer> streamOfInts = streamOfStreamOfInts
            // ////////////////
            // PROBLEM
            //    |
            //    V
            .parallel()
            .reduce(Stream.empty(), Stream::concat);

    // THEN
    System.out.println(streamOfInts.map(String::valueOf).collect(
            joining(", ")));
}

问题

有人可以解释这个限制吗? /找到一种更好的方法来处理流的并行减少


编辑 1

在@Smutje 和@LouisWasserman 评论之后,似乎 .flatMap(Function.identity())是容忍 .parallel() 的更好选择流

最佳答案

您使用的reduce 形式采用标识 和关联组合函数。但是 Stream.empty() 不是一个值;它有状态。流不是数组或集合之类的数据结构;它们是通过可能并行的聚合操作推送数据的载体,并且它们具有某种状态(例如流是否已被消耗)。想想这是如何工作的;您将构建一棵树,其中相同的“空”流出现在多个叶子中。当您尝试两次使用此有状态的非身份(不会按顺序发生,但会并行发生)时,您第二次尝试遍历该空流时,将非常正确地看到它已被使用.

所以问题是,您只是错误地使用了这个 reduce 方法。问题不在于并行性;只是并行性暴露了潜在的问题。

其次,即使这按照您认为应该的方式“工作”,您也只能并行构建表示扁平流的树;当您进行连接时,那里是一个顺序流管道。糟糕。

第三,即使这按照您认为应该的方式“工作”,您将通过构建串联流来增加大量元素访问开销,并且您不会获得并行性的好处你在寻找。

简单的答案是展平流:

String joined = streamOfStreams.parallel()
                               .flatMap(s -> s)
                               .collect(joining(", "));

关于java - 为什么不能并行减少流流?/流已经被操作或关闭,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25412377/

相关文章:

java - 每个模块(在多模块 Maven 项目中)是否应该有自己专用的异常?

Java:当您不需要任何分隔符时如何拆分字符串

c++ - 并行实例化类导致与内存相关的错误

c - 什么时候应该忽略关键部分以及什么时候不需要等待?开放式MP

java-8 - 使用 Java 8 流将枚举转换为设置

java - 服务帐户在本地工作但在应用引擎上部署时中断的 Google Drive Auth

java - JBoss 通过传递键来检查 HashMap 的值

c++ - OpenMP:为什么这些 omp 并行部分不使用多线程执行?

java - HashMap resize方法实现细节

Java:如何构造或模式化一个类,以便我们可以在运行时映射组合对象?