java - Spark 从列中获取udf名称并执行

标签 java apache-spark apache-spark-sql

我注册了一些udfs,都有相同的输入参数类型和相同的输出类型(String)。 假设是 udf1、udf2、udf3。都有不同的功能。

在我的数据集中,我有多个列,在一列中,我有我想在这一行数据上执行的 udf 的名称。

数据集示例:

+---+-------+-------+
|A  |   B   |udf    |
+---+-------+-------+
|1  |   a   |udf1   |
|2  |   b   |udf2   |
|3  |   c   |udf3   |
+---+-------+-------+

我想做这样的事情:

ds.withColumn("TEST", functions.callUDF(<name of right udf>, col("A"), col("B"))

我怎样才能做到这一点?是否可能,如果不可能,可能的解决方法是什么?

背景:我的 Spark 作业有一组 UDF,我想为该行动态执行正确的 udf。

最佳答案

试试这个::

def func1(y: Int, z: String): String = y+z
def func2(y: Int, z: String): String = y+","+z
def default(y: Int, z: String): String = y

val udfName = udf({ (x: String, y: Int, z: String) => x match {
case "func1" => func1(y,z)
case "func2" => func2(y,z)
case _ => default(y,z)
}})

val data = Seq((1,"a","func1"),
(2,"b","func2")
).toDF("A", "B", "udf")

data.withColumn("TEST", udfName(col("udf"), col("A"), col("B")))

您还可以使用源代码库来获得更高级的处理方式:

scala get function name that was sent as param

关于java - Spark 从列中获取udf名称并执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58284919/

相关文章:

java - 有多种方法做同样的事情

python - 在 PySpark 中展平 RDD

scala - 在 Spark Json 到 Csv 转换中?

scala - 如何将字符串连接到 Spark 中的列?

scala - Spark SQL - 在更新一个列时选择所有列

java - 如何向main添加命令参数? java

java - Http URL 连接无法处理重定向

java - 打印给定数字中的最大数字 - Java

python - spark posexplode 函数运行速度很慢

python - Pyspark 中的中位数和分位数值