java - 最后处理某些流元素

标签 java concurrency java-8 java-stream

我有一个我必须实现的接口(interface),它需要一个 Stream 响应。我的源中的某些元素缺少数据,我必须使用源中的其他元素来查找它。它太大,无法在内存中容纳所有元素。我可以编写一个例程来查找丢失的数据,但前提是我最后处理丢失数据的元素。

这是我尝试解决此问题的一个简化示例。在这种情况下,我试图在 addOne 的附加例程之后保存 30 个元素以在最后处理。但是当程序尝试从列表流中读取时,我收到了一个 ConcurrentModificationException。

package test;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

public class TestStreams {
    private static List<Integer> savedForLater = new ArrayList<>();

    public static void main(String[] args) {
        Stream<Integer> origStream = Stream.of(10, 20, 30, 40, 50).filter(
                i -> saveThirtyForLater(i));
        Stream<Integer> savedForLaterStream = savedForLater.stream().map(
                i -> addOne(i));

        // Exception
        Stream.concat(origStream, savedForLaterStream).forEach(
            i -> System.out.println(i));

        // No Exception
        // origStream.forEach(i -> System.out.println(i));
        // savedForLaterStream.forEach(i -> System.out.println(i));
    }

    private static Integer addOne(Integer i) {
        return new Integer(i + 1);
    }

    private static boolean saveThirtyForLater(Integer i) {
        if (i == 30) {
            savedForLater.add(i);
            return false;
        }
        return true;
    }
}

此代码产生以下结果:

10
20
40
50
Exception in thread "main" java.util.ConcurrentModificationException
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:502)
    at java.util.stream.StreamSpliterators$WrappingSpliterator.forEachRemaining(StreamSpliterators.java:312)
    at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
    at test.TestStreams.main(TestStreams.java:17)

我尝试过使用线程安全列表,但它也没有产生预期的结果。

根据 JavaDoc Stream.concat 创建一个惰性 连接流,其元素是第一个流的所有元素,后跟第二个流的所有元素。

流上的连接不应该调用列表的流,直到它从列表中拉出一个对象,此时列表没有改变。

如果所有其他方法都失败了,我可以读取文件两次,但我真的很想知道为什么这不起作用,以及是否有人有关于操纵流以避免第二次读取的替代想法。

最佳答案

流是惰性的。除非你使用诸如 forEachcollect 之类的终端操作,否则中间操作(如 filtermap)将不会被执行。

Stream<Integer> origStream = Stream.of(10, 20, 30, 40, 50).filter(
        i -> saveThirtyForLater(i));

执行上面的代码行后,您的 savedForLater 列表保持不变。只有在您对该流使用终端操作后,它才会被修改。

在您的最终表达式 Stream.concat(origStream, savedForLaterStream).forEach(i -> System.out.println(i)); 中,您使用终端操作 forEach在流 origStreamsavedForLaterStream 上。前一个流将修改 savedForLater 列表,而后者实际上迭代它 - 这就是您得到 ConcurrentModificationException 的原因。

修改filter 方法中的字段是一种非常糟糕的做法,它实际上违反了filter 方法的约定。从它的 javadoc:

predicate - a non-interfering, stateless predicate to apply to each element to determine if it should be included

您的谓词 saveThirtyForLater 不是无状态的,因为它修改了 savedForLater 列表。

解决方案:

您可以单独处理这些流,一个接一个地处理这些流,而不是使用 concat:

origStream.forEach(i -> System.out.println(i));
savedForLaterStream.forEach(i -> System.out.println(i));

这些产生了预期的结果:

10
20
40
50
31

关于java - 最后处理某些流元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37277778/

相关文章:

java - Mac OS X 上的 SWT 菜单不显示

没有线程的并发

java - Java 中的并行任务

java - 从理论上讲,一个JAVA程序需要JRE编译吗?

Java,在字符r之前截断数组行。 X

java - 尽管有 'DB_CLOSE_ON_EXIT=FALSE',但 H2 内存中测试数据库已关闭

go - "selective"goroutines 互斥

java - 使用 java 8 转换数组列表 -> 映射

java - 如何更新 Java 8 EA (Windows 8)?

java - 如何声明 scala 方法以便可以使用可变参数样式从 Java 调用它