scala - 如何在 groupBy 之后聚合 map 列?

标签 scala apache-spark apache-spark-sql

我需要合并两个数据框并按键组合列。这两个datafrmaes具有相同的架构,例如:

root
|-- id: String (nullable = true)
|-- cMap: map (nullable = true)
|    |-- key: string
|    |-- value: string (valueContainsNull = true)

我想按“id”分组并将“cMap”聚合在一起以进行重复数据删除。 我试过代码:

val df = df_a.unionAll(df_b).groupBy("id").agg(collect_list("cMap") as "cMap").
rdd.map(x => {
    var map = Map[String,String]()
    x.getAs[Seq[Map[String,String]]]("cMap").foreach( y => 
        y.foreach( tuple =>
        {
            val key = tuple._1
            val value = tuple._2
            if(!map.contains(key))//deduplicate
                map += (key -> value)
        }))

    Row(x.getAs[String]("id"),map)
    })

但似乎 collect_list 不能用于映射结构:

org.apache.spark.sql.AnalysisException: No handler for Hive udf class org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList because: Only primitive type arguments are accepted but map<string,string> was passed as parameter 1..;

这个问题还有其他解决方案吗?

最佳答案

您必须首先在映射列上使用 explode 函数以 destructure 映射到键和值列,union 结果数据集,然后distinct 去重复,然后只有 groupBy 使用一些自定义 Scala 编码来聚合 map 。

别说了,我们来写代码吧……

给定数据集:

scala> a.show(false)
+---+-----------------------+
|id |cMap                   |
+---+-----------------------+
|one|Map(1 -> one, 2 -> two)|
+---+-----------------------+

scala> a.printSchema
root
 |-- id: string (nullable = true)
 |-- cMap: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

scala> b.show(false)
+---+-------------+
|id |cMap         |
+---+-------------+
|one|Map(1 -> one)|
+---+-------------+

scala> b.printSchema
root
 |-- id: string (nullable = true)
 |-- cMap: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

您应该首先使用 explode map 列上的功能。

explode(e: Column): Column Creates a new row for each element in the given array or map column.

val a_keyValues = a.select('*, explode($"cMap"))
scala> a_keyValues.show(false)
+---+-----------------------+---+-----+
|id |cMap                   |key|value|
+---+-----------------------+---+-----+
|one|Map(1 -> one, 2 -> two)|1  |one  |
|one|Map(1 -> one, 2 -> two)|2  |two  |
+---+-----------------------+---+-----+

val b_keyValues = b.select('*, explode($"cMap"))

使用以下内容,您可以获得不同的键值对,这正是您要求的重复数据删除。

val distinctKeyValues = a_keyValues.
  union(b_keyValues).
  select("id", "key", "value").
  distinct // <-- deduplicate
scala> distinctKeyValues.show(false)
+---+---+-----+
|id |key|value|
+---+---+-----+
|one|1  |one  |
|one|2  |two  |
+---+---+-----+

groupBy 的时间并创建最终的 map 列。

val result = distinctKeyValues.
  withColumn("map", map($"key", $"value")).
  groupBy("id").
  agg(collect_list("map")).
  as[(String, Seq[Map[String, String]])]. // <-- leave Rows for typed pairs
  map { case (id, list) => (id, list.reduce(_ ++ _)) }. // <-- collect all entries under one map
  toDF("id", "cMap") // <-- give the columns their names
scala> result.show(truncate = false)
+---+-----------------------+
|id |cMap                   |
+---+-----------------------+
|one|Map(1 -> one, 2 -> two)|
+---+-----------------------+

请注意,从 Spark 2.0.0 开始 unionAll已被弃用,union 是正确的联合运算符:

(Since version 2.0.0) use union()

关于scala - 如何在 groupBy 之后聚合 map 列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44234397/

相关文章:

python - PySpark 数据帧 : Find closest value and slice the DataFrame

scala - 如何检查 Future[Option] 列表中是否存在 None

apache-spark - Spark主内存需求与数据大小相关

python - 删除或加速 PySpark 中的显式 for 循环

scala - Spark、Scala 中的数组操作

apache-spark - 为 spark thrift 服务器提供仓库目录的路径

scala - 在Scala中,有一种简洁而简单的方法来比较一个值和多个值

xml - scala.xml.parsing.ConstructingParser 拆分文本内容

scala - 从源代码构建 Scala 2.12 二进制文件

apache-spark - Spark 中的 Metastore 是什么?