我有一个我必须实现的接口(interface),它需要一个 Stream 响应。我的源中的某些元素缺少数据,我必须使用源中的其他元素来查找它。它太大,无法在内存中容纳所有元素。我可以编写一个例程来查找丢失的数据,但前提是我最后处理丢失数据的元素。
这是我尝试解决此问题的一个简化示例。在这种情况下,我试图在 addOne 的附加例程之后保存 30 个元素以在最后处理。但是当程序尝试从列表流中读取时,我收到了一个 ConcurrentModificationException。
package test;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
public class TestStreams {
private static List<Integer> savedForLater = new ArrayList<>();
public static void main(String[] args) {
Stream<Integer> origStream = Stream.of(10, 20, 30, 40, 50).filter(
i -> saveThirtyForLater(i));
Stream<Integer> savedForLaterStream = savedForLater.stream().map(
i -> addOne(i));
// Exception
Stream.concat(origStream, savedForLaterStream).forEach(
i -> System.out.println(i));
// No Exception
// origStream.forEach(i -> System.out.println(i));
// savedForLaterStream.forEach(i -> System.out.println(i));
}
private static Integer addOne(Integer i) {
return new Integer(i + 1);
}
private static boolean saveThirtyForLater(Integer i) {
if (i == 30) {
savedForLater.add(i);
return false;
}
return true;
}
}
此代码产生以下结果:
10
20
40
50
Exception in thread "main" java.util.ConcurrentModificationException
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:502)
at java.util.stream.StreamSpliterators$WrappingSpliterator.forEachRemaining(StreamSpliterators.java:312)
at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
at test.TestStreams.main(TestStreams.java:17)
我尝试过使用线程安全列表,但它也没有产生预期的结果。
根据 JavaDoc Stream.concat 创建一个惰性 连接流,其元素是第一个流的所有元素,后跟第二个流的所有元素。
流上的连接不应该调用列表的流,直到它从列表中拉出一个对象,此时列表没有改变。
如果所有其他方法都失败了,我可以读取文件两次,但我真的很想知道为什么这不起作用,以及是否有人有关于操纵流以避免第二次读取的替代想法。
最佳答案
流是惰性的。除非你使用诸如 forEach
或 collect
之类的终端操作,否则中间操作(如 filter
或 map
)将不会被执行。
Stream<Integer> origStream = Stream.of(10, 20, 30, 40, 50).filter(
i -> saveThirtyForLater(i));
执行上面的代码行后,您的 savedForLater
列表保持不变。只有在您对该流使用终端操作后,它才会被修改。
在您的最终表达式 Stream.concat(origStream, savedForLaterStream).forEach(i -> System.out.println(i));
中,您使用终端操作 forEach
在流 origStream
和 savedForLaterStream
上。前一个流将修改 savedForLater
列表,而后者实际上迭代它 - 这就是您得到 ConcurrentModificationException
的原因。
修改filter
方法中的字段是一种非常糟糕的做法,它实际上违反了filter
方法的约定。从它的 javadoc:
predicate - a non-interfering, stateless predicate to apply to each element to determine if it should be included
您的谓词 saveThirtyForLater
不是无状态的,因为它修改了 savedForLater
列表。
解决方案:
您可以单独处理这些流,一个接一个地处理这些流,而不是使用 concat
:
origStream.forEach(i -> System.out.println(i));
savedForLaterStream.forEach(i -> System.out.println(i));
这些产生了预期的结果:
10
20
40
50
31
关于java - 最后处理某些流元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37277778/