java - 通过鉴别器函数对流进行分区

标签 java java-8 java-stream

Streams API 中缺少的功能之一是“分区依据”转换,例如 Clojure 中定义的.假设我想重现 Hibernate 的 fetch join:我想发出单个 SQL SELECT 语句以从结果中接收此类对象:

class Family {
   String surname;
   List<String> members;
}

我发布:

SELECT f.name, m.name 
FROM Family f JOIN Member m on m.family_id = f.id
ORDER BY f.name

然后我检索到 (f.name, m.name) 的平面流记录。现在我需要将其转换为 Family 的流对象,里面有它的成员列表。假设我已经有一个 Stream<ResultRow> ;现在我需要把它变成一个 Stream<List<ResultRow>>然后通过映射转换对其进行操作,将其转换为 Stream<Family> .

转换的语义如下:继续将流收集到 List 中。只要提供的鉴别器函数一直返回相同的值;一旦值改变,发出 List作为输出流的元素并开始收集新的 List .

希望能写出这样的代码(我已经有了resultStream方法):

Stream<ResultRow> dbStream = resultStream(queryBuilder.createQuery(
        "SELECT f.name, m.name"
      + " FROM Family f JOIN Member m on m.family_id = f.id"
      + " ORDER BY f.name"));
Stream<List<ResultRow> partitioned = partitionBy(r -> r.string(0), dbStream);
Stream<Family> = partitioned.map(rs -> {
                    Family f = new Family(rs.get(0).string(0));
                    f.members = rs.stream().map(r -> r.string(1)).collect(toList());
                    return f;
                 });

不用说,我希望生成的流保持惰性(非物化),因为我希望能够处理任何大小的结果集而不会达到任何 O(n) 内存限制。如果没有这个关键要求,我会对提供的 groupingBy 感到满意 Collection 家。

最佳答案

解决方案要求我们定义一个自定义的Spliterator可用于构造分区流。我们需要通过它自己的拆分器访问输入流并将其包装到我们的拆分器中。然后从我们的自定义拆分器构建输出流。

以下 Spliterator 将转换任何 Stream<E>进入Stream<List<E>>提供了 Function<E, ?>作为判别函数。请注意,必须对输入流进行排序才能使此操作有意义。

import java.util.*;
import java.util.Spliterators.AbstractSpliterator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static java.util.Comparator.naturalOrder;

public class PartitionBySpliterator<E> extends AbstractSpliterator<List<E>> {
    private final Spliterator<E> spliterator;
    private final Function<? super E, ?> partitionBy;
    private HoldingConsumer<E> holder;
    private Comparator<List<E>> comparator;

    public PartitionBySpliterator(
            Spliterator<E> toWrap,
            Function<? super E, ?> partitionBy
    ) {
        super(Long.MAX_VALUE, toWrap.characteristics() & ~SIZED | NONNULL);
        this.spliterator = toWrap;
        this.partitionBy = partitionBy;
    }

    public static <E> Stream<List<E>> partitionBy(
            Function<E, ?> partitionBy, Stream<E> in
    ) {
        return StreamSupport.stream(
                new PartitionBySpliterator<>(in.spliterator(), partitionBy), false);
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<E>> action) {
        final HoldingConsumer<E> h;
        if (holder == null) {
            h = new HoldingConsumer<>();
            if (!spliterator.tryAdvance(h)) {
              return false;
            }
            holder = h;
        } else {
          h = holder;
        }
        final ArrayList<E> partition = new ArrayList<>();
        final Object partitionKey = partitionBy.apply(h.value);
        boolean didAdvance;
        do {
          partition.add(h.value);
        }
        while ((didAdvance = spliterator.tryAdvance(h))
                && Objects.equals(partitionBy.apply(h.value), partitionKey));
        if (!didAdvance) {
          holder = null;
        }
        action.accept(partition);
        return true;
    }

    static final class HoldingConsumer<T> implements Consumer<T> {
        T value;

        @Override
        public void accept(T value) {
            this.value = value;
        }
    }

    @Override
    public Comparator<? super List<E>> getComparator() {
        final Comparator<List<E>> c = this.comparator;
        return c != null ? c : (this.comparator = comparator());
    }

    private Comparator<List<E>> comparator() {
        @SuppressWarnings({"unchecked", "rawtypes"})
        final Comparator<? super E> innerComparator =
                Optional.ofNullable(spliterator.getComparator())
                        .orElse((Comparator) naturalOrder());
        return (left, right) -> {
            final int c = innerComparator.compare(left.get(0), right.get(0));
            return c != 0 ? c : innerComparator.compare(
                    left.get(left.size() - 1), right.get(right.size() - 1));
        };
    }
}

关于java - 通过鉴别器函数对流进行分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28363323/

相关文章:

java - JxBrowser:cookie中的url是什么意思?

java - 仅在源代码级别 1.8 或更高版本中才允许引用接口(interface)静态方法

JAVA- Integer.parseInt( str ) 给出 NumberFormatException,输入是代表整数的 str

java - 由于 cloudera quickstart vm 中的 yarn 容器错误,Hipi 平均像素数程序失败

Java8 Streams - 使用 Stream Distinct 删除重复项

java - 使用 com.google.code.gson 在 Java8 中创建 JSON 对象

java - 显示为 XX :ParallelGCThreads 选择的默认值

入口集上的 Java 8 流映射

java - Java 8 mapToInt ( mapToInt(e -> e) ) 如何准确地提高性能?

Java 无法序列化包含带有比较器的 TreeMap 的对象