我想使用 Spark/Scala 合并多个 map 。这些 map 有一个案例类实例作为值。
相关代码如下:
case class SampleClass(value1:Int,value2:Int)
val sampleDataDs = Seq(
("a",25,Map(1->SampleClass(1,2))),
("a",32,Map(1->SampleClass(3,4),2->SampleClass(1,2))),
("b",16,Map(1->SampleClass(1,2))),
("b",18,Map(2->SampleClass(10,15)))).toDF("letter","number","maps")
输出:
+------+-------+--------------------------+
|letter|number |maps |
+------+-------+--------------------------+
|a | 25 | [1-> [1,2]] |
|a | 32 | [1-> [3,4], 2 -> [1,2]] |
|b | 16 | [1 -> [1,2]] |
|b | 18 | [2 -> [10,15]] |
+------+-------+--------------------------+
我想根据“字母”列对数据进行分组,以便最终数据集应具有以下预期的最终输出:
+------+---------------------------------+
|letter| maps |
+------+---------------------------------+
|a | [1-> [4,6], 2 -> [1,2]] |
|b | [1-> [1,2], 2 -> [10,15]] |
+------+---------------------------------+
我尝试按“字母”分组,然后应用 udf 来聚合 map 中的值。以下是我的尝试:
val aggregatedDs = SampleDataDs.groupBy("letter").agg(collect_list(SampleDataDs("maps")).alias("mapList"))
输出:
+------+----------------------------------------+
|letter| mapList |
+------+-------+--------------------------------+
|a | [[1-> [1,2]],[1-> [3,4], 2 -> [1,2]]] |
|b | [[1-> [1,2]],[2 -> [10,15]]] |
+------+----------------------------------------+
在此之后,我尝试编写一个 udf 来合并 collect_list
的输出并获得最终输出:
def mergeMap = udf { valSeq:Seq[Map[Int,SampleClass]] =>
valSeq.flatten.groupBy(_._1).mapValues(x=>(x.map(_._2.value1).reduce(_ + _),x.map(_._2.value2).reduce(_ + _)))
}
val aggMapDs = aggregatedDs.withColumn("aggValues",mergeMap(col("mapList")))
但是它失败并显示错误消息:
执行用户定义函数失败
引起:java.lang.classCastException:org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema无法转换为SampleClass
我的Spark版本是2.3.1。有什么想法如何获得预期的最终输出吗?
最佳答案
问题是由于 UDF
无法接受案例类作为输入。 Spark 数据帧将在内部将您的案例类表示为 Row 对象。因此,可以通过更改 UDF
输入类型来避免该问题,如下所示:
val mergeMap = udf((valSeq:Seq[Map[Int, Row]]) => {
valSeq.flatten
.groupBy(_._1)
.mapValues(x=>
SampleClass(
x.map(_._2.getAs[Int]("value1")).reduce(_ + _),
x.map(_._2.getAs[Int]("value2")).reduce(_ + _)
)
)
})
请注意,上面需要一些小的额外更改来处理 Row 对象。
运行此代码将导致:
val aggMapDs = aggregatedDs.withColumn("aggValues",mergeMap(col("mapList")))
+------+----------------------------------------------+-----------------------------+
|letter|mapList |aggValues |
+------+----------------------------------------------+-----------------------------+
|b |[Map(1 -> [1,2]), Map(2 -> [10,15])] |Map(2 -> [10,15], 1 -> [1,2])|
|a |[Map(1 -> [1,2]), Map(1 -> [3,4], 2 -> [1,2])]|Map(2 -> [1,2], 1 -> [4,6]) |
+------+----------------------------------------------+-----------------------------+
关于scala - 将多个 map 与 map 值合并为自定义案例类实例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57765349/