concurrency - 如果并行处理,为什么在无限的数字流中按素性过滤会永远持续下去?

标签 concurrency java-8 java-stream

我正在创建一个无限的整数流,从 2 亿开始,使用朴素的素性测试实现过滤这个流以生成负载并将结果限制为 10。

Predicate<Integer> isPrime = new Predicate<Integer>() {
    @Override
    public boolean test(Integer n) {
        for (int i = 2; i < n; i++) {
            if (n % i == 0) return false;   
        }
        return true;
    }
};

Stream.iterate(200_000_000, n -> ++n)
    .filter(isPrime)
    .limit(10)
    .forEach(i -> System.out.print(i + " "));

这按预期工作。

现在,如果我在过滤之前添加对 parallel() 的调用,则不会产生任何内容并且处理不会完成。
Stream.iterate(200_000_000, n -> ++n)
    .parallel()
    .filter(isPrime)
    .limit(10)
    .forEach(i -> System.out.print(i + " "));

有人可以指出我在这里发生的事情的正确方向吗?

编辑:我不是在寻找更好的素性测试实现(它旨在成为一个长期运行的实现),而是为了解释使用并行流的负面影响。

最佳答案

处理实际上已完成,但可能需要很长时间,具体取决于您机器上的硬件线程数。 API documentation about limit 警告并行流可能会很慢。

实际上,并行流首先根据可用的并行度将计算拆分为几个部分,对每个部分执行计算,然后将结果连接在一起。你的任务有多少部分?每个普通 FJP 线程(= Runtime.getRuntime().availableProcessors())加上(有时?)一个用于当前线程,如果它不在 FJP 中。你可以控制它添加

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");

实际上,对于您的任务,您设置的数字越小,计算速度就越快。

如何拆分无限任务?您的特定任务由 IteratorSpliterator 处理,其中 trySplit方法创建从 1024 开始不断增加大小的块。您可以自己尝试:
Spliterator<Integer> spliterator = Stream.iterate(200_000_000, n -> ++n).spliterator();
Spliterator[] spliterators = new Spliterator[10];
for(int i=0; i<spliterators.length; i++) {
    spliterators[i] = spliterator.trySplit();
}
for(int i=0; i<spliterators.length; i++) {
    System.out.print((i+1)+": ");
    spliterators[i].tryAdvance(System.out::println);
}       

所以第一个块处理范围 200000000-200001023 的数字,第二个处理范围 200001024-200003071 的数字,依此类推。如果您只有 1 个硬件线程,您的任务将被拆分为两个块,因此将检查 3072。如果您有 8 个硬件线程,您的任务将被拆分为 9 个块并检查 46080 个数字。只有在处理完所有块后,并行计算才会停止。将任务拆分为如此大的块的启发式方法在您的情况下效果不佳,但是如果该区域周围的质数在数千个数字中出现一次,您会看到性能提升。

可能您的特定场景可以在内部进行优化(即,如果第一个线程发现限制条件已经达到,则停止计算)。随时向 Java 错误跟踪器报告错误。

更新 在深入了解 Stream API 之后,我得出结论,当前的行为是一个错误,raised an issue并发布了 patch .该补丁很可能会被 JDK9 接受,甚至可能会向后移植到 JDK 8u 分支。使用我的补丁,并行版本仍然没有提高性能,但至少它的工作时间与顺序流工作时间相当。

关于concurrency - 如果并行处理,为什么在无限的数字流中按素性过滤会永远持续下去?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30001446/

相关文章:

Java并发: How to select and configure Executors

java - 具有过滤器链接的 ParallelStream

java - 如果条件取决于当前值和先前值,我如何检查 takeWhile 中的条件?

lambda - 为什么在不安全操作之后而不是在整个 forEach 循环之后抛出 ConcurrentModificationException ?

Java 8 流媒体 : How to convert list of objects to a list of its selected properties

java - 在迭代 ConcurrentHashMap 时添加和删除值

安卓 2.1 GoogleMaps ItemizedOverlay ConcurrentModificationException

java - 为 Unsafe.putOrdered*() 的发布实现获取?

java - Collectors.groupingBy 进入对象列表?

java - 根据使用 Java 8 流的元素之间的差异拆分数字的有序列表多个列表