假设我生成了一个 Spark Dataframe:
val df = Seq(
(Array(1, 2, 3), Array("a", "b", "c")),
(Array(1, 2, 3), Array("a", "b", "c"))
).toDF("Col1", "Col2")
可以使用以下内容提取 "Col1"
中第一个索引处的元素:
val extractFirstInt = udf { (x: Seq[Int], i: Int) => x(i) }
df.withColumn("Col1_1", extractFirstInt($"Col1", lit(1)))
类似地,第二列“Col2”
,例如
val extractFirstString = udf { (x: Seq[String], i: Int) => x(i) }
df.withColumn("Col2_1", extractFirstString($"Col2", lit(1)))
但是代码重复有点难看——我需要为每个底层元素类型提供一个单独的 UDF。
有没有办法编写通用 UDF,自动推断 Spark 数据集列中底层数组的类型?例如。我希望能够编写类似 (pseudocode; with generic T
)
val extractFirst = udf { (x: Seq[T], i: Int) => x(i) }
df.withColumn("Col1_1", extractFirst($"Col1", lit(1)))
以某种方式,类型 T
会被 Spark/Scala 编译器自动推断出来(如果合适的话,可能会使用反射)。
如果您知道同时适用于数组列和 Spark 自己的 DenseVector
/SparseVector
类型的解决方案,那就加分了。我想避免的主要事情(如果可能的话)是为我想要处理的每个底层数组元素类型定义单独的 UDF 的要求。
最佳答案
也许frameless可能是一个解决方案吗?
由于操作数据集需要针对给定类型的编码器
,因此您必须预先定义类型,以便 Spark SQL 可以为您创建一个类型。我认为使用 Scala 宏来生成各种编码器支持的类型在这里是有意义的。
到目前为止,我会为每种类型定义一个泛型方法和一个 UDF(这违背了您想要找到一种方法来拥有“泛型 UDF”的愿望,该方法会自动推断出底层数组的类型Spark 数据集的列“)。
def myExtract[T](x: Seq[T], i: Int) = x(i)
// define UDF for extracting strings
val extractString = udf(myExtract[String] _)
使用如下:
val df = Seq(
(Array(1, 2, 3), Array("a", "b", "c")),
(Array(1, 2, 3), Array("a", "b", "c"))
).toDF("Col1", "Col2")
scala> df.withColumn("Col1_1", extractString($"Col2", lit(1))).show
+---------+---------+------+
| Col1| Col2|Col1_1|
+---------+---------+------+
|[1, 2, 3]|[a, b, c]| b|
|[1, 2, 3]|[a, b, c]| b|
+---------+---------+------+
您可以改为探索Dataset
(不是DataFrame
,即Dataset[Row]
)。这将为您提供所有类型机制(也许您可以避免任何宏开发)。
关于scala - 如何将数组或向量列分成多列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43824410/