java - 如何在没有 isFinite() 和 isOrdered() 方法的情况下安全地使用 Java Streams?

标签 java java-stream

关于java方法是否应该返回Collections or Streams的问题,其中 Brian Goetz 回答说,即使对于有限序列,Streams 通常也是首选。

但在我看来,目前许多来自其他地方的 Streams 操作无法安全地执行,并且防御性代码保护是不可能的,因为 Streams 不会显示它们是无限的还是无序的。

如果并行是我想在 Stream() 上执行的操作的问题,我可以调用 isParallel() 来检查或顺序调用以确保计算是并行的(如果我记得的话)。

但是如果有序性或有限性(大小)与我的程序的安全性相关,我就不能编写安全措施。

假设我使用了一个实现这个虚构接口(interface)的库:

public interface CoordinateServer {
    public Stream<Integer> coordinates();
    // example implementations:
    // finite, ordered, sequential
    // IntStream.range(0, 100).boxed()
    // final AtomicInteger atomic = new AtomicInteger();
    
    // // infinite, unordered, sequential
    // Stream.generate(() -> atomic2.incrementAndGet()) 

    // infinite, unordered, parallel
    // Stream.generate(() -> atomic2.incrementAndGet()).parallel()
    
    // finite, ordered, sequential, should-be-closed
    // Files.lines(Path.path("coordinates.txt")).map(Integer::parseInt)
}

那么我可以在此流上安全地调用哪些操作来编写正确的算法?

似乎如果我可能想将元素写入文件作为副作用,我需要担心流是并行的:

// if stream is parallel, which order will be written to file?
coordinates().peek(i -> {writeToFile(i)}).count();
// how should I remember to always add sequential() in  such cases?

如果它是并行的,它是基于什么线程池并行的?

如果我想对流进行排序(或其他非短路操作),我需要以某种方式对它的无限性保持谨慎:

coordinates().sorted().limit(1000).collect(toList()); // will this terminate?
coordinates().allMatch(x -> x > 0); // will this terminate?

我可以在排序前施加一个限制,但如果我期望一个未知大小的有限流,那应该是哪个魔数(Magic Number)?

最后也许我想并行计算以节省时间然后收集结果:

// will result list maintain the same order as sequential?
coordinates().map(i -> complexLookup(i)).parallel().collect(toList());

但是如果流没有排序(在那个版本的库中),那么结果可能会由于并行处理而变得困惑。但是,除了不使用并行(这违背了性能目的)之外,我该如何防范呢?

集合明确表示有限或无限,是否有顺序,并且它们不携带处理模式或线程池。这些似乎是 API 的宝贵属性。

此外,Streams may sometimes need to be closed ,但最常见的不是。如果我从方法(方法参数)中使用流,我通常应该调用 close 吗?

此外,流可能已经被消耗,能够优雅地处理这种情况会很好,所以 check if the stream has already been consumed 会很好;

我希望有一些代码片段可用于在处理流之前验证关于流的假设,例如>

Stream<X> stream = fooLibrary.getStream();
Stream<X> safeStream = StreamPreconditions(
    stream, 
    /*maxThreshold or elements before IllegalArgumentException*/
    10_000,
    /* fail with IllegalArgumentException if not ordered */
    true
    )

最佳答案

据我所知,经过一些研究(一些实验和 here ),没有办法确定流是否有限。

不仅如此,有时它甚至无法确定,除非在运行时(例如在 java 11 中 - IntStream.generate(() -> 1).takeWhile(x -> externalCondition(x))).

你可以做的是:

  1. 您可以通过几种方式确定它是否是有限的(请注意,在这些方面收到 false 并不意味着它是无限的,只是它可能是无限的):

    1. stream.spliterator().getExactSizeIfKnown() - 如果它具有已知的确切大小,则它是有限的,否则它将返回 -1。

    2. stream.spliterator().hasCharacteristics(Spliterator.SIZED) - 如果它是 SIZED 将返回 true。

  2. 您可以通过假设最坏的情况来保护自己(取决于您的情况)。

    1. stream.sequential()/stream.parallel() - 明确设置您的首选消费类型。
    2. 对于潜在的无限流,假设每种情况下的最坏情况。

      1. 例如,假设您想要收听推文流,直到找到 Venkat 的推文为止- 这是一个潜在的无限操作,但您想等到找到这样的推文。因此,在这种情况下,只需使用 stream.filter(tweet -> isByVenkat(tweet)).findAny() - 它会迭代直到(或永远)出现这样的推文。
      2. 另一种情况,可能是更常见的情况,是想要对所有元素做某事,或者只尝试一定的时间(类似于超时)。为此,我建议在调用您的操作(collectallMatch 或类似的)之前始终调用 stream.limit(x),其中 x 是您愿意接受的尝试次数。

毕竟,我只想提一下,我认为返回一个流通常不是一个好主意,我会尽量避免它,除非有很大的好处。

关于java - 如何在没有 isFinite() 和 isOrdered() 方法的情况下安全地使用 Java Streams?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56529755/

相关文章:

java - 从 Optional<Object> 创建对象

java - 在 Java 流中查看的替代方法

java - 使用 Intellij 为 Java 代码创建 Jar 文件

java - 如何在 GXT 3.x 中实现卡住列?

java - 使用Java解决ATM程序中的两个错误

java - 使用 Java 8 流处理嵌套集合

java - 使用 anyMatch 的流中的空安全

java - 将流元素映射到 LocalDate,而不收集到列表

java - 在构造函数后面有一个覆盖方法的部分的 Java 语法是什么?

c# - .Net 是否有 org.apache.commons.lang.StringEscapeUtils 的实现?