apache-spark - 如何使用UDF返回多列?

标签 apache-spark apache-spark-sql

是否可以创建一个返回列集的 UDF?

即具有如下数据框:

| Feature1 | Feature2 | Feature 3 |
| 1.3      | 3.4      | 4.5       |

现在我想提取一个新特征,它可以被描述为两个元素的向量(例如,如线性回归中所示 - 斜率和偏移量)。所需的数据集应如下所示:

| Feature1 | Feature2 | Feature 3 | Slope | Offset |
| 1.3      | 3.4      | 4.5       | 0.5   | 3      |

是否可以使用单个 UDF 创建多个列,或者我是否需要遵循以下规则:“每个 UDF 一列”?

最佳答案

结构体方法

您可以将udf函数定义为

def myFunc: (String => (String, String)) = { s => (s.toLowerCase, s.toUpperCase)}

import org.apache.spark.sql.functions.udf
val myUDF = udf(myFunc)

并使用.*作为

val newDF = df.withColumn("newCol", myUDF(df("Feature2"))).select("Feature1", "Feature2", "Feature 3", "newCol.*")

我已从 udf 函数返回 Tuple2 用于测试目的(可以根据需要的多列数使用更高阶的元组),它将被视为 结构列。然后您可以使用 .* 选择单独列中的所有元素,最后重命名它们。

您的输出应该为

+--------+--------+---------+---+---+
|Feature1|Feature2|Feature 3|_1 |_2 |
+--------+--------+---------+---+---+
|1.3     |3.4     |4.5      |3.4|3.4|
+--------+--------+---------+---+---+

您可以重命名_1_2

数组方法

udf 函数应返回一个数组

def myFunc: (String => Array[String]) = { s => Array("s".toLowerCase, s.toUpperCase)}

import org.apache.spark.sql.functions.udf
val myUDF = udf(myFunc)

您可以选择数组的元素并使用别名来重命名它们

val newDF = df.withColumn("newCol", myUDF(df("Feature2"))).select($"Feature1", $"Feature2", $"Feature 3", $"newCol"(0).as("Slope"), $"newCol"(1).as("Offset"))

你应该有

+--------+--------+---------+-----+------+
|Feature1|Feature2|Feature 3|Slope|Offset|
+--------+--------+---------+-----+------+
|1.3     |3.4     |4.5      |s    |3.4   |
+--------+--------+---------+-----+------+

关于apache-spark - 如何使用UDF返回多列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48979440/

相关文章:

apache-spark - 将文件保存到 Parquet 时,分区列移动到行尾

dataframe - Spark : Iterating through columns in each row to create a new dataframe

Pyspark - Dataframe 上的深度优先搜索

sql - 如何在 FROM 语句上使用 header 加载 SparkSQL

dataframe - 可空字段在写入 Spark Dataframe 时发生更改

python-2.7 - 运行 PySpark 时出错,无法连接到 master

apache-spark - 复杂类型的模式演化

scala - 如何检查点数据帧?

apache-spark - Oozie shell 操作为 Spark 作业抛出 NullPointerException

scala - 为什么 apache spark 中的这两个阶段计算的是同一件事?