java - Apache Spark Reduce 与 java.lang.Math.max 意外行为

标签 java apache-spark

在将 Spark reduce 函数与 java.lang.Math.max 结合使用时,我遇到了一些意外行为。这是示例代码:

JavaPairRDD<Island, Long> populationWithFitness = parallelizedIslandPop.mapToPair(isl -> evaluateFitness(isl, fitnessCalculator));
System.out.println(populationWithFitness.values().collect().toString());
long currentMaxFitness = populationWithFitness.values().reduce(Math::max);
System.out.println("After Reduce: " + currentMaxFitness);

上面的代码被多次调用,大多数时候会产生意想不到的结果,如下所示:

[-2754285, -2535458, -2626449, -3182283] //printed RDD after collect
After Reduce: -2392513 //value produced by reducer

如您所见,reducer 生成值 -2392513,但是与 RDD 的打印值进行比较时,该值甚至不在 RDD 中。为什么? collect() 是否影响 reduce()?我也尝试了其他方法,首先减少然后收集原始 RDD,我仍然观察到这种奇怪的行为。我认为从 java.Math 库传递静态方法可能会在序列化时导致问题,但引用此 Spark Quick Start Tutorial他们还在 reducer 中使用 Math.max ,显然它应该可以工作。

有什么想法吗?

谢谢

编辑:

附加信息:此代码片段是具有多次迭代的较大程序的一部分,并且在每次迭代中都会调用它。第一次迭代产生正确的结果,其中从 reducer 产生的 maxValue 是正确的值,但所有其他迭代都产生奇怪的结果

编辑2:

当我像这样连续打印 populationWithFitness.values().collect().toString() 三次时:

JavaPairRDD<Island, Long> populationWithFitness = parallelizedIslandPop.mapToPair(isl -> evaluateFitness(isl, fitnessCalculator));
System.out.println(populationWithFitness.values().collect().toString());
System.out.println(populationWithFitness.values().collect().toString());
System.out.println(populationWithFitness.values().collect().toString());
long currentMaxFitness = populationWithFitness.values().reduce(Math::max);
System.out.println("After Reduce: " + currentMaxFitness);

我得到的输出如下所示:

Generation 1
[-3187591, -3984035, -3508984, -3054649]
[-3187591, -3984035, -3508984, -3054649]
[-3187591, -3984035, -3508984, -3054649]
After Reduce: -3054649
Generation 2
[-3084310, -3931687, -3508984, -3054649]
[-3084310, -3847178, -3508984, -2701881]
[-3148206, -3984035, -2806859, -2989184]
After Reduce: -2949478
Generation 3
[-3187591, -3984035, -3696853, -3054649]
[-3187591, -3984035, -3178920, -3015411]
[-3148206, -3804759, -3657984, -2701881]
After Reduce: -2710313
Generation 4
[-3187591, -2982220, -3310753, -3054649]
[-3148206, -2985628, -3657984, -2701881]
[-3148206, -2706580, -3451228, -2989184]
After Reduce: -2692651
.
.
.
正如您所看到的,在第一次迭代中一切正常,但在所有接下来的迭代中它会产生奇怪的输出。我想问题是它与惰性评估有关,当我调用收集时它没有应用转换,但我不确定。

我还尝试用 JavaDoubleRDD 替换 reduce(Math::max) 并在此 JavaDoubleRDD 上调用 max > 但结果是一样的:

JavaDoubleRDD stats = populationWithFitness.mapToDouble(tup -> tup._2());
long currentMaxFitness = stats.max().longValue();

另一个重要的一点是,我正在本地模式下测试此代码,并使用参数运行它:

spark --class "main.TravellingSalesmanMain" --master local[4] exampletravellingsalesman-1.0-SNAPSHOT.jar > sparkoutput.txt

最佳答案

这很可能(99%)发生在 evaluateFitness(isl,fitnessCalculator) 内部的某个地方。它似乎正在使用某种不可重现的源,因此发送回的结果与不同的运行不同。请记住,Spark 是惰性的,并且执行将在每个连续操作上重新运行。您可以使用缓存来帮助实现此目的,但即使这样也可能会失败(节点失败/数据超出缓存)。最好的选择是在这里使用检查点,但更重要的是,您应该更改执行本身,使其具有幂等性。

关于java - Apache Spark Reduce 与 java.lang.Math.max 意外行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36107239/

相关文章:

java - 循环未按预期工作

java - 在android中反转/突出显示按钮的适当方法

java - 有没有一种方法可以返回没有 String[] 参数的数组?

python - Spark : use reduceByKey instead of groupByKey and mapByValues

python - Spark可调参数特别是执行程序内存

java - 如何在套接字连接中发送不同的数据类型

java - Jooq - 忽略重复项

hadoop - 如何在 Spark2 中启用 spark.history.fs.cleaner?

apache-spark - 应该如何配置spark sql来访问hive Metastore?

apache-spark - 基于流的应用程序中的受控/手动错误/恢复处理