scala - 如何查找 Spark 中每个分区的总和

标签 scala apache-spark rdd partitioning

我已经创建了类并使用该类来创建 RDD。我想计算每个分区的 LoudnessRate (类成员)的总和。该总和稍后将用于计算每个分区的平均响度率。 我尝试过以下代码,但它不计算 Sum 并返回 0.0。 我的代码是

    object sparkBAT {
      def main(args: Array[String]): Unit = {
        val numPartitions = 3
        val N = 50
        val d = 5
        val MinVal = -10
        val MaxVal =  10
        val conf = new SparkConf().setMaster(locally("local")).setAppName("spark Sum")
        val sc = new SparkContext(conf)

        val ba = List.fill(N)(new BAT(d, MinVal, MaxVal))
        val rdd = sc.parallelize(ba, numPartitions)

        var arrSum =Array.fill(numPartitions)(0.0) // Declare Array that will hold sum for each Partition
        rdd.mapPartitionsWithIndex((k,iterator) => iterator.map(x => arrSum(k) += x.LoudnessRate)).collect()
        arrSum foreach println
      }
    }


    class BAT (dim:Int, min:Double, max:Double) extends Serializable {    
      val random = new Random()
      var position      : List[Double]      =   List.fill(dim) (random.nextDouble() * (max-min)+min )
      var velocity      :List[Double]       =   List.fill(dim)( math.random)
      var PulseRate     : Double            =   0.1
      var LoudnessRate  :Double             =   0.95
      var frequency     :Double             =   math.random
      var fitness       :Double             =   math.random
      var BestPosition  :List[Double]       =   List.fill(dim)(math.random)
      var BestFitness   :Double             =   math.random 
    }

最佳答案

根据要求将我的评论更改为答案。原评论

You are modifying arrSum in executor JVMs and printing its values in the dirver JVM. You can map the iterators to singleton iterators and use collect to move the values to the driver. Also, don't use iterator.map for side-effects, iterator.foreach is meant for that.

这里是一个示例片段如何做到这一点。首先创建一个具有两个分区的 RDD,0 -> 1,2,31 -> 4,5。当然,您在实际代码中不需要它,但随着 sc.parallelize 行为根据环境而变化,这将始终创建统一的 RDD 来重现:

object DemoPartitioner extends Partitioner {
  override def numPartitions: Int = 2
  override def getPartition(key: Any): Int = key match {
    case num: Int => num
  }
}
val rdd = sc
  .parallelize(Seq((0, 1), (0, 2), (0, 3), (1, 4), (1, 5)))
  .partitionBy(DemoPartitioner)
  .map(_._2)

然后是实际的技巧:

val sumsByPartition = rdd.mapPartitionsWithIndex {
  case (partitionNum, it) => Iterator.single(partitionNum -> it.sum)
}.collect().toMap
println(sumsByPartition)

输出:

Map(0 -> 6, 1 -> 9)

关于scala - 如何查找 Spark 中每个分区的总和,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56660996/

相关文章:

algorithm - 有没有比命令式算法更快的函数式算法?

python - 如何在 PySpark 的 RDD 的列中查找标准偏差

position - 如何获取Spark RDD中元素的位置?

python - PySpark:when子句中的多个条件

apache-spark - 在 Spark 中映射列表的每个元素

scala - Spark 数据集中的 groupByKey

scala - 蛋糕图案——为什么这么复杂

scala - 如何从 Scala 中的 DataFrame 在 Spark 中创建分布式稀疏矩阵

scala - 在 Spark SQL 中,如何注册和使用通用 UDF?

scala - 使用本地时增加Spark内存[*]