我有这段代码:
var blong = Stream.iterate(BigInteger.ZERO, bi -> bi.add(BigInteger.ONE))
.collect(Collector.of(
() -> Stream.of(),
(s, bi) -> Stream.concat(s, Stream.of(bi)),
(s1, s2) -> Stream.concat(s1, s2),
s -> s
));
System.out.println(blong.getClass().getName());
它无法正常工作。我收到 IllegalStateException
:
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
at java.base/java.util.stream.AbstractPipeline.spliterator(AbstractPipeline.java:346)
at java.base/java.util.stream.Stream.concat(Stream.java:1618)
at UninitializedTest.lambda$2(UninitializedTest.java:28)
at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.base/java.util.stream.Stream$1.tryAdvance(Stream.java:1469)
at java.base/java.util.Spliterator.forEachRemaining(Spliterator.java:332)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at UninitializedTest.main(UninitializedTest.java:27)
看来我的 Supplier
返回的 Stream
在创建后立即关闭。
即使我从一个空的List
创建一个Stream
或者一个带有一些实际BigInteger
数据的Stream
它,我得到同样的错误。
为什么我的信息流被关闭?
最佳答案
首先,让我们剖析一下您创建的自定义收集器:
Collector.of(
() -> Stream.of(), // Container - this object is meant accumulate the result by consuming stream elements (in this case it can't really change its state, but nominally its still a container)
(s, bi) -> Stream.concat(s, Stream.of(bi)), // Accumulator - is meant to define how stream elements should be accumulated in the container (again in this case we can't do anything with the container)
(s1, s2) -> Stream.concat(s1, s2), // Combiner - defines how containers should be merged while executing stream in parallel (NB: - we can replace it with a method reference Stream::concat)
s -> s // Finisher function - describes the final transformation which should be performed with container (NB: since it doesn't perform any action we can omit this argument, there's an overloaded version which doesn't expect Finisher)
)
首先值得指出的是,Stream不是数据容器(如集合)。
因此,提供一个空流 () -> Stream.of()
作为收集器的容器是一个错误 - 容器需要是可变的。但我们无法将元素插入空流中。
其次,您的自定义收集器的累加器( Collector.of()
的第二个参数)没有执行您可能期望的操作做。
累加器是 BiConsumer<R, T>
您已按如下方式实现了它:
(s, bi) -> Stream.concat(s, Stream.of(bi))
这里Stream.concat()
消耗流 s
中的所有元素和 Stream.of(bi)
返回的流并生成一个新的未命名流,它很快就会成为垃圾收集器的猎物。提醒:BiConsumer
不返回值,因此 concat()
返回的流消失了。
流s
仍然存在(意味着收集器知道它的引用),但在执行 concat()
时它已经被消耗了,即它关闭。它发生在第一个流元素 ( BigInteger.ZERO
) 获取进程时。当收集器尝试处理第二个元素时,您会收到异常,因为 concat()
尝试使用流 s
已经关闭了。
When the
Consumer
fires, I'm expecting theStream<Stream<BigInteger>>
to be consumed, returning aStream<BigInteger>
首先,BiConsumer
以及 Consumer
有一个 abstract
方法 accept()
这是 void
,它并不意味着返回任何内容。
It appears that the Streams being returned by my
Supplier
are being closed
其次,感觉您对 Collector 的工作方式有误解。在顺序执行场景中,可变容器的实例将仅创建一次(并且每个线程并行创建一个容器,除非您指定它是一个并发收集器通过提供 Collector.Characteristics.CONCURRENT
,在这种情况下所有线程将共享同一个容器)。
Container 应该是一个可变对象(否则它不会像您的情况那样有用),它的工作是累积流元素。 IE。 容器更改其状态,而收集器消耗流中的元素。
Stream.iterate(seed,f)
返回的流类型为 Stream<BigInteger>
.
由供应商生产的容器 () -> Stream.of()
类型为 Stream<Object>
因为编译器无法推断空流的类型,除非您使用类型见证(如 .<BigInteger>of()
)显式指定它.
因此,concat()
返回的流在累加器内部也将是 Stream<Object>
类型。并提醒该流将被忽略。
不会有像 Stream<Stream<BigInteger>>
这样的野兽任何地方,无论是在管道中,还是在收集器内。
最后,我要重申,将元素添加到 Stream
本质上是不可能的。 .
关于java - 为什么我的 `Stream` 在运行时被关闭?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73451674/