java - 为什么我的 `Stream` 在运行时被关闭?

标签 java java-stream collectors consumer

我有这段代码:

var blong = Stream.iterate(BigInteger.ZERO, bi -> bi.add(BigInteger.ONE))
    .collect(Collector.of(
        () -> Stream.of(),
        (s, bi) -> Stream.concat(s, Stream.of(bi)),
        (s1, s2) -> Stream.concat(s1, s2),
        s -> s
    ));

System.out.println(blong.getClass().getName());

它无法正常工作。我收到 IllegalStateException:

Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.base/java.util.stream.AbstractPipeline.spliterator(AbstractPipeline.java:346)
at java.base/java.util.stream.Stream.concat(Stream.java:1618)
at UninitializedTest.lambda$2(UninitializedTest.java:28)
at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.base/java.util.stream.Stream$1.tryAdvance(Stream.java:1469)
at java.base/java.util.Spliterator.forEachRemaining(Spliterator.java:332)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at UninitializedTest.main(UninitializedTest.java:27)

看来我的 Supplier 返回的 Stream 在创建后立即关闭。

即使我从一个空的List创建一个Stream或者一个带有一些实际BigInteger数据的Stream它,我得到同样的错误。

为什么我的信息流被关闭?

最佳答案

首先,让我们剖析一下您创建的自定义收集器:

Collector.of(
    () -> Stream.of(),                          // Container - this object is meant accumulate the result by consuming stream elements (in this case it can't really change its state, but nominally its still a container)
    (s, bi) -> Stream.concat(s, Stream.of(bi)), // Accumulator - is meant to define how stream elements should be accumulated in the container (again in this case we can't do anything with the container)
    (s1, s2) -> Stream.concat(s1, s2),          // Combiner - defines how containers should be merged while executing stream in parallel (NB: - we can replace it with a method reference Stream::concat)
    s -> s                                      // Finisher function - describes the final transformation which should be performed with container (NB: since it doesn't perform any action we can omit this argument, there's an overloaded version which doesn't expect Finisher)
)

首先值得指出的是,Stream不是数据容器(如集合)。

因此,提供一个空流 () -> Stream.of()作为收集器容器是一个错误 - 容器需要是可变的。但我们无法将元素插入空流中。

其次,您的自定义收集器累加器( Collector.of() 的第二个参数)没有执行您可能期望的操作做。

累加器BiConsumer<R, T>您已按如下方式实现了它:

(s, bi) -> Stream.concat(s, Stream.of(bi))

这里Stream.concat()消耗流 s 中的所有元素和 Stream.of(bi) 返回的流并生成一个新的未命名流,它很快就会成为垃圾收集器的猎物。提醒:BiConsumer不返回值,因此 concat() 返回的流消失了。

s仍然存在(意味着收集器知道它的引用),但在执行 concat() 时它已经被消耗了,即它关闭。它发生在第一个流元素 ( BigInteger.ZERO ) 获取进程时。当收集器尝试处理第二个元素时,您会收到异常,因为 concat()尝试使用流 s已经关闭了。


When the Consumer fires, I'm expecting the Stream<Stream<BigInteger>> to be consumed, returning a Stream<BigInteger>

首先,BiConsumer以及 Consumer有一个 abstract方法 accept() 这是 void ,它并不意味着返回任何内容。

It appears that the Streams being returned by my Supplier are being closed

其次,感觉您对 Collector 的工作方式有误解。在顺序执行场景中,可变容器的实例将仅创建一次(并且每个线程并行创建一个容器,除非您指定它是一个并发收集器通过提供 Collector.Characteristics.CONCURRENT ,在这种情况下所有线程将共享同一个容器)。

Container 应该是一个可变对象(否则它不会像您的情况那样有用),它的工作是累积流元素。 IE。 容器更改其状态,而收集器消耗流中的元素。

Stream.iterate(seed,f) 返回的流类型为 Stream<BigInteger> .

供应商生产的容器 () -> Stream.of()类型为 Stream<Object>因为编译器无法推断空流的类型,除非您使用类型见证(如 .<BigInteger>of() )显式指定它.

因此,concat() 返回的流在累加器内部也将是 Stream<Object> 类型。并提醒该流将被忽略。

不会有像 Stream<Stream<BigInteger>> 这样的野兽任何地方,无论是在管道中,还是在收集器内。

最后,我要重申,将元素添加到 Stream 本质上是不可能的。 .

关于java - 为什么我的 `Stream` 在运行时被关闭?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73451674/

相关文章:

java - 如何在 ListView 中添加最近播放的歌曲?

java - 在 map 中加入 List<String>

java - 如何让 ExecutorService 中的线程进入等待阶段

java - 将 Set 转换为 Map 时出现问题

java - 使用强制转换和 lambda 进行收集

java - 仅将 "NOPMD"字符串更改为单个规则

java - Java 输入缓冲区中的空字符持久性?

java - Runtime.exec() 的安全问题

java - 如何使用分组映射流?

list - 如何从 map 中获取 optional 值作为 optional 列表?