scala - 如何查找 Spark 中每个分区的总和

标签 scala apache-spark rdd partitioning

我已经创建了类并使用该类来创建 RDD。我想计算每个分区的 LoudnessRate (类成员)的总和。该总和稍后将用于计算每个分区的平均响度率。 我尝试过以下代码,但它不计算 Sum 并返回 0.0。 我的代码是

    object sparkBAT {
      def main(args: Array[String]): Unit = {
        val numPartitions = 3
        val N = 50
        val d = 5
        val MinVal = -10
        val MaxVal =  10
        val conf = new SparkConf().setMaster(locally("local")).setAppName("spark Sum")
        val sc = new SparkContext(conf)

        val ba = List.fill(N)(new BAT(d, MinVal, MaxVal))
        val rdd = sc.parallelize(ba, numPartitions)

        var arrSum =Array.fill(numPartitions)(0.0) // Declare Array that will hold sum for each Partition
        rdd.mapPartitionsWithIndex((k,iterator) => => arrSum(k) += x.LoudnessRate)).collect()
        arrSum foreach println

    class BAT (dim:Int, min:Double, max:Double) extends Serializable {    
      val random = new Random()
      var position      : List[Double]      =   List.fill(dim) (random.nextDouble() * (max-min)+min )
      var velocity      :List[Double]       =   List.fill(dim)( math.random)
      var PulseRate     : Double            =   0.1
      var LoudnessRate  :Double             =   0.95
      var frequency     :Double             =   math.random
      var fitness       :Double             =   math.random
      var BestPosition  :List[Double]       =   List.fill(dim)(math.random)
      var BestFitness   :Double             =   math.random 



You are modifying arrSum in executor JVMs and printing its values in the dirver JVM. You can map the iterators to singleton iterators and use collect to move the values to the driver. Also, don't use for side-effects, iterator.foreach is meant for that.

这里是一个示例片段如何做到这一点。首先创建一个具有两个分区的 RDD,0 -> 1,2,31 -> 4,5。当然,您在实际代码中不需要它,但随着 sc.parallelize 行为根据环境而变化,这将始终创建统一的 RDD 来重现:

object DemoPartitioner extends Partitioner {
  override def numPartitions: Int = 2
  override def getPartition(key: Any): Int = key match {
    case num: Int => num
val rdd = sc
  .parallelize(Seq((0, 1), (0, 2), (0, 3), (1, 4), (1, 5)))


val sumsByPartition = rdd.mapPartitionsWithIndex {
  case (partitionNum, it) => Iterator.single(partitionNum -> it.sum)


Map(0 -> 6, 1 -> 9)

