apache-spark - 如何使用 groupBy 将行收集到 map 中?

标签 apache-spark apache-spark-sql

上下文

sqlContext.sql(s"""
SELECT
school_name,
name,
age
FROM my_table
""")

询问

鉴于上表,我想按学校名称分组并将名称、年龄收集到 Map[String, Int]

例如 - 伪代码

val df = sqlContext.sql(s"""
SELECT
school_name,
age
FROM my_table
GROUP BY school_name
""")


------------------------
school_name | name  | age
------------------------
school A | "michael"| 7 
school A | "emily"  | 5
school B | "cathy"  | 10
school B | "shaun"  | 5


df.groupBy("school_name").agg(make_map)

------------------------------------
school_name | map
------------------------------------
school A    | {"michael": 7, "emily": 5}
school B    | {"cathy": 10, "shaun": 5}

最佳答案

以下内容适用于 Spark 2.0。您可以使用map自 2.0 版本以来可用的函数可将列获取为 Map。

val df1 = df.groupBy(col("school_name")).agg(collect_list(map($"name",$"age")) as "map")
df1.show(false)

这将为您提供以下输出。

+-----------+------------------------------------+
|school_name|map                                 |
+-----------+------------------------------------+
|school B   |[Map(cathy -> 10), Map(shaun -> 5)] |
|school A   |[Map(michael -> 7), Map(emily -> 5)]|
+-----------+------------------------------------+

现在您可以使用 UDF 将各个 Map 连接成单个 Map,如下所示。

import org.apache.spark.sql.functions.udf
val joinMap = udf { values: Seq[Map[String,Int]] => values.flatten.toMap }

val df2 = df1.withColumn("map", joinMap(col("map")))
df2.show(false)

这将为 Map[String,Int] 提供所需的输出。

+-----------+-----------------------------+
|school_name|map                          |
+-----------+-----------------------------+
|school B   |Map(cathy -> 10, shaun -> 5) |
|school A   |Map(michael -> 7, emily -> 5)|
+-----------+-----------------------------+

如果您想将列值转换为 JSON 字符串,则 Spark 2.1.0 引入了 to_json功能。

val df3 = df2.withColumn("map",to_json(struct($"map")))
df3.show(false)

to_json 函数将返回以下输出。

+-----------+-------------------------------+
|school_name|map                            |
+-----------+-------------------------------+
|school B   |{"map":{"cathy":10,"shaun":5}} |
|school A   |{"map":{"michael":7,"emily":5}}|
+-----------+-------------------------------+

关于apache-spark - 如何使用 groupBy 将行收集到 map 中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41819275/

相关文章:

Scala 将 Json 文件读取为 Json

r - 是否可以将 Spark 中的 data.table 与 Spark Dataframes 一起使用?

scala - spark 文本文件加载文件而不是行

scala - Spark 1.5.2 : Filtering a dataframe in Scala

apache-spark - 为什么在完成作业和关闭 Spark 之间会发生磁盘繁忙尖峰?

python - 如何使用列值作为 PySpark 中字典的键?

sql - Spark.sql() 的 REGEXP_REPLACE

apache-spark - 将长毫秒的纪元时间转换为结构化流式 SQL 的时间戳

apache-spark - SPARK中提供了HIVE表,但未在Hive CLI中显示

sql - 如何在不单独指定每一列的情况下在所有行中搜索文本