Streams API 中缺少的功能之一是“分区依据”转换,例如 Clojure 中定义的.假设我想重现 Hibernate 的 fetch join:我想发出单个 SQL SELECT 语句以从结果中接收此类对象:
class Family {
String surname;
List<String> members;
}
我发布:
SELECT f.name, m.name
FROM Family f JOIN Member m on m.family_id = f.id
ORDER BY f.name
然后我检索到 (f.name, m.name)
的平面流记录。现在我需要将其转换为 Family
的流对象,里面有它的成员列表。假设我已经有一个 Stream<ResultRow>
;现在我需要把它变成一个 Stream<List<ResultRow>>
然后通过映射转换对其进行操作,将其转换为 Stream<Family>
.
转换的语义如下:继续将流收集到 List
中。只要提供的鉴别器函数一直返回相同的值;一旦值改变,发出 List
作为输出流的元素并开始收集新的 List
.
希望能写出这样的代码(我已经有了resultStream
方法):
Stream<ResultRow> dbStream = resultStream(queryBuilder.createQuery(
"SELECT f.name, m.name"
+ " FROM Family f JOIN Member m on m.family_id = f.id"
+ " ORDER BY f.name"));
Stream<List<ResultRow> partitioned = partitionBy(r -> r.string(0), dbStream);
Stream<Family> = partitioned.map(rs -> {
Family f = new Family(rs.get(0).string(0));
f.members = rs.stream().map(r -> r.string(1)).collect(toList());
return f;
});
不用说,我希望生成的流保持惰性(非物化),因为我希望能够处理任何大小的结果集而不会达到任何 O(n) 内存限制。如果没有这个关键要求,我会对提供的 groupingBy
感到满意 Collection 家。
最佳答案
解决方案要求我们定义一个自定义的Spliterator
可用于构造分区流。我们需要通过它自己的拆分器访问输入流并将其包装到我们的拆分器中。然后从我们的自定义拆分器构建输出流。
以下 Spliterator 将转换任何 Stream<E>
进入Stream<List<E>>
提供了 Function<E, ?>
作为判别函数。请注意,必须对输入流进行排序才能使此操作有意义。
import java.util.*;
import java.util.Spliterators.AbstractSpliterator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static java.util.Comparator.naturalOrder;
public class PartitionBySpliterator<E> extends AbstractSpliterator<List<E>> {
private final Spliterator<E> spliterator;
private final Function<? super E, ?> partitionBy;
private HoldingConsumer<E> holder;
private Comparator<List<E>> comparator;
public PartitionBySpliterator(
Spliterator<E> toWrap,
Function<? super E, ?> partitionBy
) {
super(Long.MAX_VALUE, toWrap.characteristics() & ~SIZED | NONNULL);
this.spliterator = toWrap;
this.partitionBy = partitionBy;
}
public static <E> Stream<List<E>> partitionBy(
Function<E, ?> partitionBy, Stream<E> in
) {
return StreamSupport.stream(
new PartitionBySpliterator<>(in.spliterator(), partitionBy), false);
}
@Override
public boolean tryAdvance(Consumer<? super List<E>> action) {
final HoldingConsumer<E> h;
if (holder == null) {
h = new HoldingConsumer<>();
if (!spliterator.tryAdvance(h)) {
return false;
}
holder = h;
} else {
h = holder;
}
final ArrayList<E> partition = new ArrayList<>();
final Object partitionKey = partitionBy.apply(h.value);
boolean didAdvance;
do {
partition.add(h.value);
}
while ((didAdvance = spliterator.tryAdvance(h))
&& Objects.equals(partitionBy.apply(h.value), partitionKey));
if (!didAdvance) {
holder = null;
}
action.accept(partition);
return true;
}
static final class HoldingConsumer<T> implements Consumer<T> {
T value;
@Override
public void accept(T value) {
this.value = value;
}
}
@Override
public Comparator<? super List<E>> getComparator() {
final Comparator<List<E>> c = this.comparator;
return c != null ? c : (this.comparator = comparator());
}
private Comparator<List<E>> comparator() {
@SuppressWarnings({"unchecked", "rawtypes"})
final Comparator<? super E> innerComparator =
Optional.ofNullable(spliterator.getComparator())
.orElse((Comparator) naturalOrder());
return (left, right) -> {
final int c = innerComparator.compare(left.get(0), right.get(0));
return c != 0 ? c : innerComparator.compare(
left.get(left.size() - 1), right.get(right.size() - 1));
};
}
}
关于java - 通过鉴别器函数对流进行分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28363323/