Java Streams - 有效地对排序流上的项目进行分组

标签 java java-8 java-stream

我正在寻找一种方法来实现非终端分组操作,这样内存开销就会最小。

例如,考虑 distinct()。在一般情况下,它别无选择,只能收集所有不同的项目,然后才将它们向前传输。但是,如果我们知道输入流已经排序,则可以使用最少的内存“即时”完成操作。

我知道我可以使用迭代器包装器并自己实现分组逻辑来为迭代器实现这一点。是否有更简单的方法来使用流 API 来实现这一点?

--编辑--

我找到了一种滥用 Stream.flatMap(..) 的方法来实现这一点:

  private static class DedupSeq implements IntFunction<IntStream> {
    private Integer prev;

    @Override
    public IntStream apply(int value) {
      IntStream res = (prev != null && value == prev)? IntStream.empty() : IntStream.of(value);
      prev = value;
      return res;
    }    
  }

然后:

IntStream.of(1,1,3,3,3,4,4,5).flatMap(new DedupSeq()).forEach(System.out::println);

打印:

1
3
4
5

通过一些更改,相同的技术可用于任何类型的内存高效序列流分组。无论如何,我不太喜欢这个解决方案,我一直在寻找更自然的东西(例如映射或过滤的工作方式)。此外,我在这里打破了约定,因为提供给 flatMap(..) 的函数是有状态的。

最佳答案

如果您想要一个不向不应该具有可变状态的函数添加可变状态的解决方案,您可以求助于collect:

static void distinctForSorted(IntStream s, IntConsumer action) {
    s.collect(()->new long[]{Long.MIN_VALUE},
              (a, i)->{ if(a[0]!=i) { action.accept(i); assert i>a[0]; a[0]=i; }},
              (a, b)->{ throw new UnsupportedOperationException(); });
}

这是可行的,因为它是使用可变容器的预期方式,但是,它不能并行工作,因为在任意流位置拆分意味着有可能在两个(或更多)线程中遇到一个值。

如果您想要一个通用的 IntStream 而不是 forEach 操作,则首选 Spliterator 低级解决方案,尽管会增加复杂性。

static IntStream distinctForSorted(IntStream s) {
    Spliterator.OfInt sp=s.spliterator();
    return StreamSupport.intStream(
      new Spliterators.AbstractIntSpliterator(sp.estimateSize(),
      Spliterator.DISTINCT|Spliterator.SORTED|Spliterator.NONNULL|Spliterator.ORDERED) {
        long last=Long.MIN_VALUE;
        @Override
        public boolean tryAdvance(IntConsumer action) {
            long prev=last;
            do if(!sp.tryAdvance(distinct(action))) return false; while(prev==last);
            return true;
        }
        @Override
        public void forEachRemaining(IntConsumer action) {
            sp.forEachRemaining(distinct(action));
        }
        @Override
        public Comparator<? super Integer> getComparator() {
            return null;
        }
        private IntConsumer distinct(IntConsumer c) {
            return i-> {
                if(i==last) return;
                assert i>last;
                last=i;
                c.accept(i);
            };
        }
    }, false);
}

它甚至继承了并行支持,尽管它通过在另一个线程中处理它们之前预取一些值来工作,因此它不会加速 distinct 操作,但如果有计算,可能会加速后续操作激烈的。


为了完成,这里有一个针对任意(即未排序)IntStream 的独特操作,它不依赖于“装箱加 HashMap”,因此可能会有更好的效果内存占用:

static IntStream distinct(IntStream s) {
    boolean parallel=s.isParallel();
    s=s.collect(BitSet::new, BitSet::set, BitSet::or).stream();
    if(parallel) s=s.parallel();
    return s;
}

它仅适用于正 int 值;将其扩展到完整的 32 位范围将需要两个 BitSet,因此看起来不那么简洁,但通常用例允许将存储限制在 31 位范围内甚至更低......

关于Java Streams - 有效地对排序流上的项目进行分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29588167/

相关文章:

java - 从 CompletableFuture 抛出异常

基于对象属性的Java 8流过滤器

serialization - 有没有办法在 Java 8 中使用 Files.lines 读取序列化文件

java - 从 SFTP 位置下载 zip 或 exe 文件

java - Spring 3.0 - 无法找到 XML 模式命名空间的 Spring NamespaceHandler [http ://www. springframework.org/schema/security]

java - 使用流合并两个集合,但仅合并唯一值,并使用谓词而不是等于?

java - ()->System.out.println ("done") 是什么意思?

java - Java中的错误消息(流和Lambda理解)

java - 登录后在REST中提取JSESSIONID

java - 我们总是需要关闭流吗?