我编写了一个程序,通过将数组拆分为相等的 block 并在单个线程中使用冒泡排序对多线程数组进行排序。然后,我使用了合并算法来合并这两个数组。
我想将此程序与使用 Streams 对数组进行排序的程序进行比较。我的问题是,如果我将数组传递到流中,我将如何进行拆分、排序和合并以并行执行排序,但通过使用并行流而不是创建我自己的线程/可运行程序等。
有什么想法吗?
最佳答案
我假设您的问题纯粹是教育性和实验性的,没有任何实际应用,因为在 Java 中有更有效的方法对元素进行排序。如果您想在这里使用 Stream API,您可以创建一个执行冒泡排序的拆分器和一个在组合器中执行合并排序的收集器。
这是拆分器:
static class BubbleSpliterator<T> implements Spliterator<T> {
private final Comparator<? super T> cmp;
private final Spliterator<T> source;
private T[] data;
private int offset;
public BubbleSpliterator(Spliterator<T> source, Comparator<? super T> cmp) {
this.source = source;
this.cmp = cmp;
}
@SuppressWarnings("unchecked")
private void init() {
if (data != null)
return;
Stream.Builder<T> buf = Stream.builder();
source.forEachRemaining(buf);
data = (T[]) buf.build().toArray();
bubble(data, cmp);
}
private static <T> void bubble(T[] data, Comparator<? super T> cmp) {
for (int i = 0; i < data.length - 1; i++)
for (int j = i + 1; j < data.length; j++) {
if (cmp.compare(data[i], data[j]) > 0) {
T tmp = data[i];
data[i] = data[j];
data[j] = tmp;
}
}
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
init();
if (offset >= data.length)
return false;
action.accept(data[offset++]);
return true;
}
@Override
public void forEachRemaining(Consumer<? super T> action) {
init();
for (int i = offset; i < data.length; i++)
action.accept(data[i]);
offset = data.length;
}
@Override
public Spliterator<T> trySplit() {
if (data != null)
return null;
Spliterator<T> prefix = source.trySplit();
return prefix == null ? null : new BubbleSpliterator<>(prefix, cmp);
}
@Override
public long estimateSize() {
if (data != null)
return data.length - offset;
return source.estimateSize();
}
@Override
public int characteristics() {
return source.characteristics();
}
public static <T> Stream<T> stream(Stream<T> source,
Comparator<? super T> comparator) {
Spliterator<T> spltr = source.spliterator();
return StreamSupport.stream(new BubbleSpliterator<>(spltr, comparator),
source.isParallel()).onClose(source::close);
}
}
它获取源,将拆分委托(delegate)给源,但是当请求元素时,它会将源元素转储到数组并为它们执行冒泡排序。你可以这样检查:
int[] data = new Random(1).ints(100, 0, 1000).toArray();
Comparator<Integer> comparator = Comparator.naturalOrder();
List<Integer> list = BubbleSpliterator.stream(Arrays.stream(data).parallel().boxed(), comparator).collect(
Collectors.toList());
System.out.println(list);
结果取决于您机器上的硬件线程数,可能如下所示:
[254, 313, 588, 847, 904, 985, 434, 473, 569, 606, 748, 978, 234, 262, 263, 317, 562, 592, 99, 189, 310,...]
在这里您可以看到输出由几个排序的序列组成。此类序列的数量对应于 Stream API 创建的并行任务的数量。
现在要通过归并排序来组合已排序的序列,您可以像这样编写一个特殊的收集器:
static <T> List<T> merge(List<T> l1, List<T> l2, Comparator<? super T> cmp) {
List<T> result = new ArrayList<>(l1.size()+l2.size());
int i=0, j=0;
while(i < l1.size() && j < l2.size()) {
if(cmp.compare(l1.get(i), l2.get(j)) <= 0) {
result.add(l1.get(i++));
} else {
result.add(l2.get(j++));
}
}
result.addAll(l1.subList(i, l1.size()));
result.addAll(l2.subList(j, l2.size()));
return result;
}
static <T> Collector<T, ?, List<T>> mergeSorting(Comparator<? super T> cmp) {
return Collector.<T, List<T>> of(ArrayList::new, List::add,
(l1, l2) -> merge(l1, l2, cmp));
}
在顺序 more 中,它的工作方式与 Collectors.toList()
相同,但在并行时,它执行合并排序,假设两个输入列表都已排序。我的 mergeSorting 实现可能不是最理想的,您可以写一些更好的东西。
因此,要通过 Stream API 对所有内容进行排序,您可以同时使用 BubbleSpliterator
和 mergeSorting
收集器:
int[] data = new Random(1).ints(100, 0, 1000).toArray();
Comparator<Integer> comparator = Comparator.naturalOrder();
List<Integer> list = BubbleSpliterator.stream(Arrays.stream(data).parallel().boxed(), comparator).collect(
mergeSorting(comparator));
System.out.println(list);
结果将完全排序。
此实现多次执行不必要的输入数据复制,因此我猜想,自定义冒泡+合并实现在性能方面可能胜过此实现。
关于java - 使用 Streams 对数组进行并行排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35579985/