scala - 计算一行的秩

标签 scala apache-spark dataframe hive apache-spark-sql

我想根据一个字段对用户 ID 进行排名。对于相同的字段值,排名应该相同。该数据在 Hive 表中。

例如

user value
a       5
b       10
c       5
d       6

Rank
a - 1
c - 1
d - 3
b - 4

我该怎么做?

最佳答案

可以通过 DataFrame API 使用 rank 窗口函数:

import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.expressions.Window

val w = Window.orderBy($"value")

val df = sc.parallelize(Seq(
  ("a", 5), ("b", 10), ("c", 5), ("d", 6)
)).toDF("user", "value")

df.select($"user", rank.over(w).alias("rank")).show

// +----+----+
// |user|rank|
// +----+----+
// |   a|   1|
// |   c|   1|
// |   d|   3|
// |   b|   4|
// +----+----+

或原始 SQL:

df.registerTempTable("df")
sqlContext.sql("SELECT user, RANK() OVER (ORDER BY value) AS rank FROM df").show

// +----+----+
// |user|rank|
// +----+----+
// |   a|   1|
// |   c|   1|
// |   d|   3|
// |   b|   4|
// +----+----+

但是效率极低。

您也可以尝试使用 RDD API,但这并不十分简单。首先让我们将 DataFrame 转换为 RDD:

import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
import org.apache.spark.RangePartitioner

val rdd: RDD[(Int, String)] = df.select($"value", $"user")
  .map{ case Row(value: Int, user: String) => (value, user) }

val partitioner = new RangePartitioner(rdd.partitions.size,  rdd)
val sorted =  rdd.repartitionAndSortWithinPartitions(partitioner)

接下来我们必须计算每个分区的排名:

def rank(iter: Iterator[(Int,String)]) =  {
  val zero = List((-1L, Integer.MIN_VALUE, "", 1L))

  def f(acc: List[(Long,Int,String,Long)], x: (Int, String)) = 
    (acc.head, x) match {
      case (
        (prevRank: Long, prevValue: Int, _, offset: Long),
        (currValue: Int, label: String)) => {
      val newRank = if (prevValue == currValue) prevRank else prevRank + offset
      val newOffset = if (prevValue == currValue) offset + 1L else 1L
      (newRank, currValue, label, newOffset) :: acc
    }
  }

  iter.foldLeft(zero)(f).reverse.drop(1).map{case (rank, _, label, _) =>
    (rank, label)}.toIterator
}


val partRanks = sorted.mapPartitions(rank)

每个分区的偏移量

def getOffsets(sorted: RDD[(Int, String)]) = sorted
  .mapPartitionsWithIndex((i: Int, iter: Iterator[(Int, String)]) => 
    Iterator((i, iter.size)))
  .collect
  .foldLeft(List((-1, 0)))((acc: List[(Int, Int)], x: (Int, Int)) => 
    (x._1, x._2 + acc.head._2) :: acc)
  .toMap

val offsets = sc.broadcast(getOffsets(sorted))

和最终排名:

def adjust(i: Int, iter: Iterator[(Long, String)]) = 
  iter.map{case (rank, label) => (rank + offsets.value(i - 1).toLong, label)}

val ranks = partRanks
  .mapPartitionsWithIndex(adjust)
  .map{case (i, label) => (1 + i , label)}

关于scala - 计算一行的秩,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33729787/

相关文章:

scala - [SlickException : Read NULL value for column (USERS/670412212). LOGIN_ID]

scala - Spark Scala:无法导入sqlContext.implicits._

arrays - 从给定点列出二维数组对角线元素的最快方法

apache-spark - PySpark 1.6.2 | orderBy/sort 之后的collect()

scala - 迭代数据框中的每一行,将其存储在 val 中并作为参数传递给 Spark SQL 查询

python - 有没有更好的方法来索引数据帧?

python pandas如何将不同数据框中的其他值乘以列

r - 分组变量选择第一行(保留一列),最后一行(保留不同的列)

mongodb - Salat GRATER GLITCH ClassNotFoundException

python - PySpark 广播变量连接