scala - Spark dataframe groupby 多次

标签 scala apache-spark

val df = (Seq((1, "a", "10"),(1,"b", "12"),(1,"c", "13"),(2, "a", "14"),
              (2,"c", "11"),(1,"b","12" ),(2, "c", "12"),(3,"r", "11")).
          toDF("col1", "col2", "col3"))

所以我有一个包含 3 列的 Spark 数据框。

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   a|  10|
|   1|   b|  12|
|   1|   c|  13|
|   2|   a|  14|
|   2|   c|  11|
|   1|   b|  12|
|   2|   c|  12|
|   3|   r|  11|
+----+----+----+

我的要求实际上是我需要执行两个级别的 groupby,如下所述。

1级: 如果我对 col1 进行 groupby 并对 Col3 进行求和。我将得到下面两列。 1.第1栏 2. 总和(第3列) 我将在这里失去 col2。

2级: 如果我想再次对 col1 和 col2 进行分组并对 Col3 求和,我将得到以下 3 列。 1.第1栏 2.第2栏 3. sum(col3)

我的要求实际上是我需要执行两个级别的 groupBy 并将这两列(level1 的 sum(col3),level2 的 sum(col3))放在最后一个数据帧中。

我该怎么做,谁能解释一下?

Spark :1.6.2 斯卡拉:2.10

最佳答案

一种选择是分别进行两个求和,然后将它们连接回来:

(df.groupBy("col1", "col2").agg(sum($"col3").as("sum_level2")).
    join(df.groupBy("col1").agg(sum($"col3").as("sum_level1")), Seq("col1")).show)

+----+----+----------+----------+
|col1|col2|sum_level2|sum_level1|
+----+----+----------+----------+
|   2|   c|      23.0|      37.0|
|   2|   a|      14.0|      37.0|
|   1|   c|      13.0|      47.0|
|   1|   b|      24.0|      47.0|
|   3|   r|      11.0|      11.0|
|   1|   a|      10.0|      47.0|
+----+----+----------+----------+
<小时/>

另一种选择是使用窗口函数,考虑到 level1_sum 是按 col1 分组的 level2_sum 之和:

import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"col1")

(df.groupBy("col1", "col2").agg(sum($"col3").as("sum_level2")).
    withColumn("sum_level1", sum($"sum_level2").over(w)).show)

+----+----+----------+----------+
|col1|col2|sum_level2|sum_level1|
+----+----+----------+----------+
|   1|   c|      13.0|      47.0|
|   1|   b|      24.0|      47.0|
|   1|   a|      10.0|      47.0|
|   3|   r|      11.0|      11.0|
|   2|   c|      23.0|      37.0|
|   2|   a|      14.0|      37.0|
+----+----+----------+----------+

关于scala - Spark dataframe groupby 多次,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41771327/

相关文章:

regex - 如何在 Scala Spark 中将空字符串替换为 N/A?

scala - 安装 Spline 时出错(Spark 的数据沿袭工具)

apache-spark - Spark 检查点行为

scala - 从Scala将UDF注册到SqlContext以在PySpark中使用

java - 为什么运行带有 "./"前缀的 sbt?它不起作用

scala - playframework scala - 找不到 controllers.Application

scala - 何时在 Scala 特征中使用 val 或 def?

scala - 如何修复我在 Scala 中部分求和的实现?

hadoop - 如果没有,如何处理 Spark RDD 分区。执行者 < 没有。 RDD分区

scala - 如何使用 saveAsTextFile 在 spark 数据框中进行自定义分区