scala - Spark 列明智的字数统计

标签 scala apache-spark summary

我们正在尝试在 spark 中生成数据集的按列统计。除了使用统计库中的汇总功能。我们正在使用以下程序:

  1. 我们确定具有字符串值的列

  2. 为整个数据集生成键值对,以列号为键,列的值为值

  3. 生成一个新的格式图

    (K,V) ->((K,V),1)

然后我们使用 reduceByKey 来查找所有列中所有唯一值的总和。我们缓存此输出以减少进一步的计算时间。

在下一步中,我们使用 for 循环遍历列以查找所有列的统计信息。

我们正在尝试通过再次使用 map reduce 方法来减少 for 循环,但我们无法找到某种方法来实现它。这样做将允许我们在一次执行中为所有列生成列统计信息。 for 循环方法按顺序运行使其非常慢。

代码:

//drops the header

    def dropHeader(data: RDD[String]): RDD[String] = {
         data.mapPartitionsWithIndex((idx, lines) => {
           if (idx == 0) {
             lines.drop(1)
           }
           lines
         })
       }

    def retAtrTuple(x: String) = {
       val newX = x.split(",")
       for (h <- 0 until newX.length) 
          yield (h,newX(h))
    }



    val line = sc.textFile("hdfs://.../myfile.csv")

    val withoutHeader: RDD[String] = dropHeader(line)

    val kvPairs = withoutHeader.flatMap(retAtrTuple) //generates a key-value pair where key is the column number and value is column's value


    var bool_numeric_col = kvPairs.map{case (x,y) => (x,isNumeric(y))}.reduceByKey(_&&_).sortByKey()    //this contains column indexes as key and boolean as value (true for numeric and false for string type)

    var str_cols = bool_numeric_col.filter{case (x,y) => y == false}.map{case (x,y) => x}
    var num_cols = bool_numeric_col.filter{case (x,y) => y == true}.map{case (x,y) => x}

    var str_col = str_cols.toArray   //array consisting the string col
    var num_col = num_cols.toArray   //array consisting numeric col


    val colCount = kvPairs.map((_,1)).reduceByKey(_+_)
    val e1 = colCount.map{case ((x,y),z) => (x,(y,z))}
    var numPairs = e1.filter{case (x,(y,z)) => str_col.contains(x) }

    //running for loops which needs to be parallelized/optimized as it sequentially operates on each column. Idea is to find the top10, bottom10 and number of distinct elements column wise
    for(i <- str_col){
       var total = numPairs.filter{case (x,(y,z)) => x==i}.sortBy(_._2._2)
       var leastOnes = total.take(10)
       println("leastOnes for Col" + i)
       leastOnes.foreach(println)
       var maxOnes = total.sortBy(-_._2._2).take(10)
       println("maxOnes for Col" + i)
       maxOnes.foreach(println)
       println("distinct for Col" + i + " is " + total.count)
    }

最佳答案

让我稍微简化一下您的问题。 (实际上很多。)我们有一个 RDD[(Int, String)]我们想找到前 10 个最常见的 String每个Int (都在 0–100 范围内)。

与排序不同,如您的示例所示,使用 Spark 内置的 RDD.top(n) 效率更高。方法。它的运行时间与数据大小成线性关系,并且需要移动的数据比排序少得多。

考虑 top 的实现在 RDD.scala .你想做同样的事情,但每个 Int 有一个优先级队列(堆) key 。代码变得相当复杂:

import org.apache.spark.util.BoundedPriorityQueue // Pretend it's not private.

def top(n: Int, rdd: RDD[(Int, String)]): Map[Int, Iterable[String]] = {
  // A heap that only keeps the top N values, so it has bounded size.
  type Heap = BoundedPriorityQueue[(Long, String)]
  // Get the word counts.
  val counts: RDD[[(Int, String), Long)] =
    rdd.map(_ -> 1L).reduceByKey(_ + _)
  // In each partition create a column -> heap map.
  val perPartition: RDD[Map[Int, Heap]] =
    counts.mapPartitions { items =>
      val heaps =
        collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n))
      for (((k, v), count) <- items) {
        heaps(k) += count -> v
      }
      Iterator.single(heaps)
    }
  // Merge the per-partition heap maps into one.
  val merged: Map[Int, Heap] =
    perPartition.reduce { (heaps1, heaps2) =>
      val heaps =
        collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n))
      for ((k, heap) <- heaps1.toSeq ++ heaps2.toSeq) {
        for (cv <- heap) {
          heaps(k) += cv
        }
      }
      heaps
    }
  // Discard counts, return just the top strings.
  merged.mapValues(_.map { case(count, value) => value })
}

这很有效,但也很痛苦,因为我们需要同时处理多个列。拥有一个 RDD 会容易得多每列,只需调用 rdd.top(10)在每个。

不幸的是,将 RDD 拆分为 N 个较小的 RDD 的天真方法执行了 N 次传递:

def split(together: RDD[(Int, String)], columns: Int): Seq[RDD[String]] = {
  together.cache // We will make N passes over this RDD.
  (0 until columns).map {
    i => together.filter { case (key, value) => key == i }.values
  }
}

一个更有效的解决方案可能是通过键将数据写出到单独的文件中,然后将其加载回单独的 RDD。这在 Write to multiple outputs by key Spark - one Spark job 中讨论.

关于scala - Spark 列明智的字数统计,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28166190/

相关文章:

scala - 是否可以将翻转实现为 Scala 函数(而不是方法)

scala - Play 2.0 - Build.scala - 从 pom.xml 转换

python - Spark 矩阵的基本线性代数

c# - 如何在另一个类中继承 C# 的摘要? (像继承 Java)

javascript - JavaScript 函数对象与 Scala 函数对象有何不同?

scala - 如何处理 playframework Controller 内的 future 选项?

python - PySpark groupByKey 返回 pyspark.resultiterable.ResultIterable

python - 无法导入 SparkContext

mysql计算前一行时间戳差异并按多个对象分组

php - 如何在 MYSQL 中获取汇总计数