在将 Spark reduce
函数与 java.lang.Math.max
结合使用时,我遇到了一些意外行为。这是示例代码:
JavaPairRDD<Island, Long> populationWithFitness = parallelizedIslandPop.mapToPair(isl -> evaluateFitness(isl, fitnessCalculator));
System.out.println(populationWithFitness.values().collect().toString());
long currentMaxFitness = populationWithFitness.values().reduce(Math::max);
System.out.println("After Reduce: " + currentMaxFitness);
上面的代码被多次调用,大多数时候会产生意想不到的结果,如下所示:
[-2754285, -2535458, -2626449, -3182283] //printed RDD after collect
After Reduce: -2392513 //value produced by reducer
如您所见,reducer 生成值 -2392513
,但是与 RDD 的打印值进行比较时,该值甚至不在 RDD 中。为什么? collect()
是否影响 reduce()
?我也尝试了其他方法,首先减少然后收集原始 RDD,我仍然观察到这种奇怪的行为。我认为从 java.Math 库传递静态方法可能会在序列化时导致问题,但引用此 Spark Quick Start Tutorial他们还在 reducer
中使用 Math.max
,显然它应该可以工作。
有什么想法吗?
谢谢
编辑:
附加信息:此代码片段是具有多次迭代的较大程序的一部分,并且在每次迭代中都会调用它。第一次迭代产生正确的结果,其中从 reducer
产生的 maxValue
是正确的值,但所有其他迭代都产生奇怪的结果
编辑2:
当我像这样连续打印 populationWithFitness.values().collect().toString()
三次时:
JavaPairRDD<Island, Long> populationWithFitness = parallelizedIslandPop.mapToPair(isl -> evaluateFitness(isl, fitnessCalculator));
System.out.println(populationWithFitness.values().collect().toString());
System.out.println(populationWithFitness.values().collect().toString());
System.out.println(populationWithFitness.values().collect().toString());
long currentMaxFitness = populationWithFitness.values().reduce(Math::max);
System.out.println("After Reduce: " + currentMaxFitness);
我得到的输出如下所示:
Generation 1
[-3187591, -3984035, -3508984, -3054649]
[-3187591, -3984035, -3508984, -3054649]
[-3187591, -3984035, -3508984, -3054649]
After Reduce: -3054649
Generation 2
[-3084310, -3931687, -3508984, -3054649]
[-3084310, -3847178, -3508984, -2701881]
[-3148206, -3984035, -2806859, -2989184]
After Reduce: -2949478
Generation 3
[-3187591, -3984035, -3696853, -3054649]
[-3187591, -3984035, -3178920, -3015411]
[-3148206, -3804759, -3657984, -2701881]
After Reduce: -2710313
Generation 4
[-3187591, -2982220, -3310753, -3054649]
[-3148206, -2985628, -3657984, -2701881]
[-3148206, -2706580, -3451228, -2989184]
After Reduce: -2692651
.
.
.
正如您所看到的,在第一次迭代中一切正常,但在所有接下来的迭代中它会产生奇怪的输出。我想问题是它与惰性评估有关,当我调用收集时它没有应用转换,但我不确定。
我还尝试用 JavaDoubleRDD
替换 reduce(Math::max)
并在此 JavaDoubleRDD
上调用 max
> 但结果是一样的:
JavaDoubleRDD stats = populationWithFitness.mapToDouble(tup -> tup._2());
long currentMaxFitness = stats.max().longValue();
另一个重要的一点是,我正在本地模式下测试此代码,并使用参数运行它:
spark --class "main.TravellingSalesmanMain" --master local[4] exampletravellingsalesman-1.0-SNAPSHOT.jar > sparkoutput.txt
最佳答案
这很可能(99%)发生在 evaluateFitness(isl,fitnessCalculator)
内部的某个地方。它似乎正在使用某种不可重现的源,因此发送回的结果与不同的运行不同。请记住,Spark 是惰性的,并且执行将在每个连续操作上重新运行。您可以使用缓存来帮助实现此目的,但即使这样也可能会失败(节点失败/数据超出缓存)。最好的选择是在这里使用检查点,但更重要的是,您应该更改执行本身,使其具有幂等性。
关于java - Apache Spark Reduce 与 java.lang.Math.max 意外行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36107239/