scala - 如何使用 Spark 计算累积和

标签 scala apache-spark

我有一个(String,Int)的rdd,它按键排序

val data = Array(("c1",6), ("c2",3),("c3",4))
val rdd = sc.parallelize(data).sortByKey

现在我想以零开始第一个键的值,并将后续键作为前面键的总和。

例如:c1 = 0 , c2 = c1 的值 , c3 = (c1 值 +c2 值) , c4 = (c1+..+c3 值) 预期输出:

(c1,0), (c2,6), (c3,9)...

有可能实现这个目标吗? 我用 map 尝试过,但总和未保留在 map 内。

var sum = 0 ;
val t = keycount.map{ x => { val temp = sum; sum = sum + x._2 ; (x._1,temp); }}

最佳答案

  1. 计算每个分区的部分结果:

    val partials = rdd.mapPartitionsWithIndex((i, iter) => {
      val (keys, values) = iter.toSeq.unzip
      val sums  = values.scanLeft(0)(_ + _)
      Iterator((keys.zip(sums.tail), sums.last))
    })
    
  2. 收集部分总和

    val partialSums = partials.values.collect
    
  3. 计算分区的累积和并广播它:

    val sumMap = sc.broadcast(
      (0 until rdd.partitions.size)
        .zip(partialSums.scanLeft(0)(_ + _))
        .toMap
    )
    
  4. 计算最终结果:

    val result = partials.keys.mapPartitionsWithIndex((i, iter) => {
      val offset = sumMap.value(i)
      if (iter.isEmpty) Iterator()
      else iter.next.map{case (k, v) => (k, v + offset)}.toIterator
    })
    

关于scala - 如何使用 Spark 计算累积和,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35154267/

相关文章:

scala - 如何在 Build.scala 中设置 sbt-proguard 插件

scala - 无法使用 Gradle 运行 Scalatest

apache-spark - Spark java.io.IOException 由于未知原因

scala - 如何计算代币?

scala - 如何在 Scala 中创建时间戳序列

scala - Scala 中的孤儿 future

Scala:如何组合两个数据框?

apache-spark - Apache Spark 和 Apache Apex 之间有什么区别?

date - 在 PySpark 数据框中从一列到另一列的最近日期

amazon-s3 - 使用 3 种方法在 Spark 程序上设置 AWS 凭证,但都不起作用