我的累加器是一个数组[Array[Int]] 在RDD的foreach操作中更新accumulator后,accumulator(0)符合预期,而accumulator(1)是完全丢失的Array(0,0,0)
在 RDD 中,累加器值为 Array(Array(4,5,6),Array(4,5,6)) 在 RDD 之外,累加器值为 Array(Array(4,5,6),Array(0,0,0))
下面是代码
import org.apache.spark.Accumulable
import org.apache.spark.AccumulableParam
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object acc {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val a =Array(Array(1,2,3),Array(4,5,6))
val rdd = sc.parallelize(a)
val initialValue = Array.fill[Array[Int]](2)(Array.fill[Int](3)(1))
val accumulator = sc.accumulable(initialValue)(MatrixAccumulatorParam)
rdd.foreach{x=>
accumulator += (x(0),0,0)
accumulator += (x(1),0,1)
accumulator += (x(2),0,2)
accumulator += (x(0),1,0)
accumulator += (x(1),1,1)
accumulator += (x(2),1,2)
println("accumulator value in rdd is"+accumulator.localValue)
}
println("accumulator value out of rdd is :" + accumulator.value )
}
}
object MatrixAccumulatorParam extends AccumulableParam[Array[Array[Int]], (Int, Int, Int)] {
def zero(initialValue: Array[Array[Int]]): Array[Array[Int]] = {
initialValue
}
def addAccumulator(acc: Array[Array[Int]], value: (Int, Int, Int)): Array[Array[Int]] = {
acc(value._2)(value._3) = value._1
acc
}
def addInPlace(m1: Array[Array[Int]], m2: Array[Array[Int]]): Array[Array[Int]] = {
val columnLength: Int = m1.length
val rowLength: Int = m1(0).length
var updatedMatrix = Array.ofDim[Int](columnLength, rowLength)
var j: Int = 0
while (j < columnLength) {
var i =0
while (i < rowLength) {
val a = Math.max(m1(j)(i), m2(j)(i))
updatedMatrix(j)(i) = a
i += 1
}
j += 1
}
updatedMatrix
}
}
结果: 在 RDD 中,累加器值为 Array(Array(4,5,6),Array(4,5,6)) 在 RDD 之外,累加器值为 Array(Array(4,5,6),Array(0,0,0))
但我在 RDD 之外期望的是 Array(Array(4,5,6),Array(4,5,6))
最佳答案
addAccumulator 方法在accumulator.variable有更新时被调用
在上面的代码中,accumulator += (x(0),0,0) 调用 addAccumulator 方法。
一旦所有任务完成,就会调用addInPlace方法来聚合所有任务的累计值。
在上面的代码中,initialValue Array(1, 1, 1)Array(1, 1, 1) 和任务 Accumulator value Array(4, 5, 6) Array(4, 5, 6) 调用 addInPlace 方法。
在上面的代码中,addInPlace 方法中的变量 i 每当进入循环时都必须重置 while (j < columnLength) {
以下代码非常有效。
while (j < columnLength) {
i=0
while (i < rowLength) {
println("m1(j)(i)"+ m1(j)(i))
println(" m2(j)(i))"+ m2(j)(i))
val a = Math.max(m1(j)(i), m2(j)(i))
updatedMatrix(j)(i) = a
i += 1
}
j += 1
}
关于apache-spark - Spark 累加器值在 RDD 内部和 RDD 外部时不同,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27357440/