scala - 具有动态数据类型的 UDF

标签 scala apache-spark apache-spark-sql user-defined-functions

我正在尝试编写可以从 Map 中删除几个键的 udf。但是Map的key和value的类型是不固定的,可以是String或者Array什么的。我应该如何定义这样的udf。我使用的是 Spark 版本 2.4.4。

下面是我的 Map[String, string] 的 udf:

val mapKeys = //Seq[String]
val mapFilterUdf = udf[Map[String, String], Map[String, String]] {
    map => map.filter{case (key, _) => mapKeys.contains(key)}
}
mapFilterUdf(dataFrame.col("column_name")).as(column.name)

最佳答案

你可以为 udf 做一个通用的工厂方法:

import scala.reflect.runtime.universe._

def filterUdfFactory[T](mapKeys:Seq[T])(implicit tag:TypeTag[T]) = udf((map:Map[T,T]) => map.filter{case (k,v) => mapKeys.contains(k)})

然后用作例如对于字符串:

val mapKeys = Seq("k1")

val tt = typeTag[String]
val filterUdf = filterUdfFactory[String](mapKeys)

 val df = Seq(
    Map("k1" -> "v1","k2" -> "v2")
 ).toDF("map")

 df.select(filterUdf($"map"))
.show()

给出:

+----------+
|  UDF(map)|
+----------+
|[k1 -> v1]|
+----------+

关于scala - 具有动态数据类型的 UDF,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67591258/

相关文章:

java - 在 java 中没有 spark-submit 可以吗?

python - Spark : Dangers of using Python

hadoop - Spark 单个记录查找的性能

apache-spark - 在 map 列的 Spark 数据框中如何使用所有键的常量更新值

json - Play 2.1(scala) - 如何为 scala 枚举案例类编写 Format[T]

java - Java/Scala 中的高性能字符串哈希函数

Scala 宏 Liftable 带有前向引用

scala - 什么是节点 [TypeOne < : Node[TypeOne]] in scala mean?

scala - 较长的RDD沿袭导致Stackoverflow

apache-spark - 如何在 SparkSQL 中使用 Dataframe 获取行迭代器