apache-spark - Spark 累加器值在 RDD 内部和 RDD 外部时不同

标签 apache-spark accumulator

我的累加器是一个数组[Array[Int]] 在RDD的foreach操作中更新accumulator后,accumulator(0)符合预期,而accumulator(1)是完全丢失的Array(0,0,0)

在 RDD 中,累加器值为 Array(Array(4,5,6),Array(4,5,6)) 在 RDD 之外,累加器值为 Array(Array(4,5,6),Array(0,0,0))

下面是代码

import org.apache.spark.Accumulable
import org.apache.spark.AccumulableParam
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object acc {
  def main(args: Array[String]) {
     val conf = new SparkConf().setAppName("Simple Application")
  val sc = new SparkContext(conf)
  val a =Array(Array(1,2,3),Array(4,5,6))
  val rdd = sc.parallelize(a)
  val initialValue = Array.fill[Array[Int]](2)(Array.fill[Int](3)(1))
  val accumulator = sc.accumulable(initialValue)(MatrixAccumulatorParam)
  rdd.foreach{x=>
     accumulator += (x(0),0,0)
     accumulator += (x(1),0,1)
     accumulator += (x(2),0,2)
     accumulator += (x(0),1,0)
     accumulator += (x(1),1,1)
     accumulator += (x(2),1,2)
     println("accumulator value in rdd is"+accumulator.localValue)
     }

  println("accumulator value out of rdd is :" + accumulator.value )

  }

}
object MatrixAccumulatorParam extends AccumulableParam[Array[Array[Int]], (Int, Int,   Int)] {

  def zero(initialValue: Array[Array[Int]]): Array[Array[Int]] = {
    initialValue
  }

  def addAccumulator(acc: Array[Array[Int]], value: (Int, Int, Int)): Array[Array[Int]] = {

    acc(value._2)(value._3) = value._1
    acc

  }

   def addInPlace(m1: Array[Array[Int]], m2: Array[Array[Int]]): Array[Array[Int]] = {
    val columnLength: Int = m1.length
    val rowLength: Int = m1(0).length
    var updatedMatrix = Array.ofDim[Int](columnLength, rowLength)

    var j: Int = 0
    while (j < columnLength) {
      var i =0
    while (i < rowLength) {
         val a = Math.max(m1(j)(i), m2(j)(i))
        updatedMatrix(j)(i) = a
        i += 1
      } 
      j += 1
    }

    updatedMatrix
      }


}

结果: 在 RDD 中,累加器值为 Array(Array(4,5,6),Array(4,5,6)) 在 RDD 之外,累加器值为 Array(Array(4,5,6),Array(0,0,0))

但我在 RDD 之外期望的是 Array(Array(4,5,6),Array(4,5,6))

最佳答案

addAccumulator 方法在accumulator.variable有更新时被调用

在上面的代码中,accumulator += (x(0),0,0) 调用 addAccumulator 方法。

一旦所有任务完成,就会调用addInPlace方法来聚合所有任务的累计值。

在上面的代码中,initialValue Array(1, 1, 1)Array(1, 1, 1) 和任务 Accumulator value Array(4, 5, 6) Array(4, 5, 6) 调用 addInPlace 方法。

在上面的代码中,addInPlace 方法中的变量 i 每当进入循环时都必须重置 while (j < columnLength) {

以下代码非常有效。

            while (j < columnLength) {
              i=0
                while (i < rowLength) {
                  println("m1(j)(i)"+ m1(j)(i))
                  println(" m2(j)(i))"+ m2(j)(i))
                    val a = Math.max(m1(j)(i), m2(j)(i))
                            updatedMatrix(j)(i) = a
                            i += 1
                } 
                j += 1
            }

关于apache-spark - Spark 累加器值在 RDD 内部和 RDD 外部时不同,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27357440/

相关文章:

apache-spark - 令人困惑的 Spark 警告 (DSE 4.8.4)

java - 在 Spark 中将数据集应用为广播

java - 使用 for 循环遍历对象和方法列表

scala - 如何创建自定义集合累加器,即 Set[String]?

json - Scala Spark - 将 JSON 列拆分为多列

apache-spark - 在Spark中读取ORC文件时如何保留分区列

scala - 如何使用累加器统计leftOuterJoin中没有匹配项的记录?

arrays - 使用递归重写 Ruby #inject (#reduce)?

计数器和累加器不工作并导致程序崩溃。我究竟做错了什么?

apache-spark - DStream updateStateByKey更新函数实现