java - Stream.reduce() 和 Stream.collect() 之间令人惊讶的性能差异

标签 java performance lambda java-8 java-stream

我想比较两个 Java8 流终端操作 reduce()collect() 的并行性能。

让我们看一下下面的 Java8 并行流示例:

import java.math.BigInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static java.math.BigInteger.ONE;

public class StartMe {

    static Function<Long, BigInteger> fac;

    static {
        fac = x -> x==0? ONE : BigInteger.valueOf(x).multiply(fac.apply(x - 1));
    }

    static long N = 2000;

    static Supplier<BigInteger[]> one() {
        BigInteger[] result = new BigInteger[1];
        result[0] = ONE;
        return () -> result;
    }

    static BiConsumer<BigInteger[], ? super BigInteger> accumulator() {
        return (BigInteger[] ba, BigInteger b) -> {
            synchronized (fac) {
                ba[0] = ba[0].multiply(b);
            }
        };
    }

    static BiConsumer<BigInteger[], BigInteger[]> combiner() {
        return (BigInteger[] b1, BigInteger[] b2) -> {};
    }

    public static void main(String[] args) throws Exception {
        long t0 = System.currentTimeMillis();

        BigInteger result1 = Stream.iterate(ONE, x -> x.add(ONE)).parallel().limit(N).reduce(ONE, BigInteger::multiply);
        long t1 = System.currentTimeMillis();

        BigInteger[] result2 = Stream.iterate(ONE, x -> x.add(ONE)).parallel().limit(N).collect(one(), accumulator(), combiner());
        long t2 = System.currentTimeMillis();

        BigInteger result3 = fac.apply(N);
        long t3 = System.currentTimeMillis();

        System.out.println("reduce():  deltaT = " + (t1-t0) + "ms, result 1 = " + result1);
        System.out.println("collect(): deltaT = " + (t2-t1) + "ms, result 2 = " + result2[0]);
        System.out.println("recursive: deltaT = " + (t3-t2) + "ms, result 3 = " + result3);

    }
}

它计算 n!使用一些 - 诚然很奇怪 ;-) - 算法。

然而,性能结果却令人惊讶:

 reduce():  deltaT = 44ms, result 1 = 3316275...
 collect(): deltaT = 22ms, result 2 = 3316275...
 recursive: deltaT = 11ms, result 3 = 3316275...

一些说明:

  • 我必须同步 accumulator(),因为它并行访问同一个数组。
  • 我预计 reduce()collect() 会产生相同的性能,但 reduce() 慢约 2 倍>collect(),即使collect()也必须同步!
  • 最快的算法是顺序递归算法(这可能显示并行流管理的巨大开销)

没想到reduce()的性能比collect()差。为什么会这样?

最佳答案

基本上,您是在测量第一次执行的代码的初始开销。不仅优化器还没有任何工作,您还在测量加载、验证和初始化类的开销。

因此,评估时间减少也就不足为奇了,因为每次评估都可以重用已为上一次评估加载的类。循环运行所有三个评估,甚至只是更改顺序都会给您带来完全不同的画面。

唯一可预测的结果是简单的递归评估将具有最小的初始开销,因为它不需要加载 Stream API 类。


如果您多次运行代码,或者更好地使用复杂的基准测试工具,我想您会得到与我类似的结果,其中 reduce明显优于 collect并且确实比单线程方法更快。

原因collect较慢是因为你完全错误地使用它。 Supplier将查询每个线程以获得不同的容器,因此累加器函数不需要需要任何额外的同步。但重要的是,组合器函数正确地工作以将不同线程的结果容器连接成一个结果。

正确的做法是:

BigInteger[] result2 = Stream.iterate(ONE, x -> x.add(ONE)).parallel().limit(N)
  .collect(()->new BigInteger[]{ONE},
           (a,v)->a[0]=a[0].multiply(v), (a,b)->a[0]=a[0].multiply(b[0]));

在我的系统上,它的性能与 reduce 相当方法。由于使用数组作为可变容器不能改变 BigInteger 的不可变性质, 使用 collect 没有优势在这里,使用 reduce是直截了当的,并且如前所述,如果正确使用这两种方法,则具有相同的性能。


顺便说一句,我不明白为什么这么多程序员试图创建自引用 lambda 表达式。递归函数的直接方法仍然是方法:

static BigInteger fac(long x) {
    return x==0? ONE : BigInteger.valueOf(x).multiply(fac(x - 1));
}
static final Function<Long, BigInteger> fac=StartMe::fac;

(尽管在您的代码中,您根本不需要 Function<Long, BigInteger>,只需直接调用 fac(long))。


最后一点,Stream.iterateStream.limit ,对于并行执行来说真的很糟糕。使用具有可预测大小和独立操作的流将显着优于您的解决方案:

BigInteger result4 = LongStream.rangeClosed(1, N).parallel()
    .mapToObj(BigInteger::valueOf).reduce(BigInteger::multiply).orElse(ONE);

关于java - Stream.reduce() 和 Stream.collect() 之间令人惊讶的性能差异,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28988262/

相关文章:

Php - 理解 create_function() - 传递简单变量

java - 对 Lambda 对象的方法调用返回值

java - 我在 android 上使用 java 时遇到异常 (java.lang.NoClassDefFoundError),为什么?

java - CipherOutputStream 和 FileOutputStream(someFile, true) 在追加数据时产生垃圾

java - 带路径压缩算法的加权快速联合

javascript - 删除未使用 Assets 的繁重任务、脚本或应用程序?

python - 复选框以取消选中所有其他复选框

java - 如何从 Java 类到 GlassFish 创建新用户?

php - 使用 Instagram API 请求循环遍历用户真的很慢

matlab - Matlab 中的快速关联数组或映射