scala - 将多个 map 与 map 值合并为自定义案例类实例

标签 scala apache-spark apache-spark-sql maps user-defined-functions

我想使用 Spark/Scala 合并多个 map 。这些 map 有一个案例类实例作为值。

相关代码如下:

case class SampleClass(value1:Int,value2:Int)

val sampleDataDs = Seq(
      ("a",25,Map(1->SampleClass(1,2))),
      ("a",32,Map(1->SampleClass(3,4),2->SampleClass(1,2))),
      ("b",16,Map(1->SampleClass(1,2))),
      ("b",18,Map(2->SampleClass(10,15)))).toDF("letter","number","maps")

输出:

+------+-------+--------------------------+
|letter|number |maps                      |
+------+-------+--------------------------+
|a     |  25   | [1-> [1,2]]              |
|a     |  32   | [1-> [3,4], 2 -> [1,2]]  |
|b     |  16   | [1 -> [1,2]]             |
|b     |  18   | [2 -> [10,15]]           |
+------+-------+--------------------------+

我想根据“字母”列对数据进行分组,以便最终数据集应具有以下预期的最终输出:

+------+---------------------------------+
|letter| maps                            |
+------+---------------------------------+
|a     | [1-> [4,6], 2 -> [1,2]]         |
|b     | [1-> [1,2], 2 -> [10,15]]       |                 
+------+---------------------------------+

我尝试按“字母”分组,然后应用 udf 来聚合 map 中的值。以下是我的尝试:

val aggregatedDs = SampleDataDs.groupBy("letter").agg(collect_list(SampleDataDs("maps")).alias("mapList")) 

输出:

+------+----------------------------------------+
|letter| mapList                                |
+------+-------+--------------------------------+
|a     | [[1-> [1,2]],[1-> [3,4], 2 -> [1,2]]]  |
|b     | [[1-> [1,2]],[2 -> [10,15]]]           |                 
+------+----------------------------------------+ 

在此之后,我尝试编写一个 udf 来合并 collect_list 的输出并获得最终输出:

def mergeMap = udf { valSeq:Seq[Map[Int,SampleClass]] =>
valSeq.flatten.groupBy(_._1).mapValues(x=>(x.map(_._2.value1).reduce(_ + _),x.map(_._2.value2).reduce(_ + _)))
}

val aggMapDs = aggregatedDs.withColumn("aggValues",mergeMap(col("mapList")))

但是它失败并显示错误消息:

执行用户定义函数失败 引起:java.lang.classCastException:org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema无法转换为SampleClass

我的Spark版本是2.3.1。有什么想法如何获得预期的最终输出吗?

最佳答案

问题是由于 UDF 无法接受案例类作为输入。 Spark 数据帧将在内部将您的案例类表示为 Row 对象。因此,可以通过更改 UDF 输入类型来避免该问题,如下所示:

val mergeMap = udf((valSeq:Seq[Map[Int, Row]]) => {
  valSeq.flatten
    .groupBy(_._1)
    .mapValues(x=> 
      SampleClass(
        x.map(_._2.getAs[Int]("value1")).reduce(_ + _),
        x.map(_._2.getAs[Int]("value2")).reduce(_ + _)
      )
    )
})

请注意,上面需要一些小的额外更改来处理 Row 对象。

运行此代码将导致:

val aggMapDs = aggregatedDs.withColumn("aggValues",mergeMap(col("mapList")))

+------+----------------------------------------------+-----------------------------+
|letter|mapList                                       |aggValues                    |
+------+----------------------------------------------+-----------------------------+
|b     |[Map(1 -> [1,2]), Map(2 -> [10,15])]          |Map(2 -> [10,15], 1 -> [1,2])|
|a     |[Map(1 -> [1,2]), Map(1 -> [3,4], 2 -> [1,2])]|Map(2 -> [1,2], 1 -> [4,6])  |
+------+----------------------------------------------+-----------------------------+

关于scala - 将多个 map 与 map 值合并为自定义案例类实例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57765349/

相关文章:

List.empty vs. List() vs. new List()

scala - 无法在已停止的 SparkContext 上调用方法

scala - 增加喷雾 jar 中的最大内容长度

apache-spark - 如何对多个 Spark 作业并行执行多个 Kafka 主题

scala - 重载方法值 <> 具有替代方案 w/Play 2.1.1 和 PlaySlick

apache-spark - 如何以编程方式提交 spark 作业

amazon-web-services - 在 Glue 作业中创建 Glue 数据目录表

java - Spark SQL Java 无法将元组转换为行和数据帧

python - 统计当前时间 N 天内的发生次数 - pyspark

scala - Spark-读取许多小的 Parquet 文件之前需要获取每个文件的状态