scala - 如何将数组或向量列分成多列?

标签 scala apache-spark

假设我生成了一个 Spark Dataframe:

val df = Seq(
    (Array(1, 2, 3), Array("a", "b", "c")),
    (Array(1, 2, 3), Array("a", "b", "c"))
).toDF("Col1", "Col2")

可以使用以下内容提取 "Col1" 中第一个索引处的元素:

val extractFirstInt = udf { (x: Seq[Int], i: Int) => x(i) }
df.withColumn("Col1_1", extractFirstInt($"Col1", lit(1)))

类似地,第二列“Col2”,例如

val extractFirstString = udf { (x: Seq[String], i: Int) => x(i) }
df.withColumn("Col2_1", extractFirstString($"Col2", lit(1)))

但是代码重复有点难看——我需要为每个底层元素类型提供一个单独的 UDF。

有没有办法编写通用 UDF,自动推断 Spark 数据集列中底层数组的类型?例如。我希望能够编写类似 (pseudocode; with generic T)

的内容
val extractFirst = udf { (x: Seq[T], i: Int) => x(i) }
df.withColumn("Col1_1", extractFirst($"Col1", lit(1)))

以某种方式,类型 T 会被 Spark/Scala 编译器自动推断出来(如果合适的话,可能会使用反射)。

如果您知道同时适用于数组列和 Spark 自己的 DenseVector/SparseVector 类型的解决方案,那就加分了。我想避免的主要事情(如果可能的话)是为我想要处理的每个底层数组元素类型定义单独的 UDF 的要求。

最佳答案

也许frameless可能是一个解决方案吗?

由于操作数据集需要针对给定类型的编码器,因此您必须预先定义类型,以便 Spark SQL 可以为您创建一个类型。我认为使用 Scala 宏来生成各种编码器支持的类型在这里是有意义的。

到目前为止,我会为每种类型定义一个泛型方法和一个 UDF(这违背了您想要找到一种方法来拥有“泛型 UDF”的愿望,该方法会自动推断出底层数组的类型Spark 数据集的列“)。

def myExtract[T](x: Seq[T], i: Int) = x(i)
// define UDF for extracting strings
val extractString = udf(myExtract[String] _)

使用如下:

val df = Seq(
    (Array(1, 2, 3), Array("a", "b", "c")),
    (Array(1, 2, 3), Array("a", "b", "c"))
).toDF("Col1", "Col2")

scala> df.withColumn("Col1_1", extractString($"Col2", lit(1))).show
+---------+---------+------+
|     Col1|     Col2|Col1_1|
+---------+---------+------+
|[1, 2, 3]|[a, b, c]|     b|
|[1, 2, 3]|[a, b, c]|     b|
+---------+---------+------+

您可以改为探索Dataset(不是DataFrame,即Dataset[Row])。这将为您提供所有类型机制(也许您可以避免任何宏开发)。

关于scala - 如何将数组或向量列分成多列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43824410/

相关文章:

apache-spark - 启动 Hive 时出现此错误 - log4j :ERROR Could not instantiate class [org. apache.hadoop.hive.shims.HiveEventCounter]

unit-testing - 从 reduceByKey() 调用函数时单元测试期间的导入错误

hadoop - Spark 应用程序继续运行并且似乎挂起 - org.apache.spark.sql.hive.thriftserver.HiveThriftServer2

scala - Kestrel 函数式编程设计模式的附加值(value)是什么? (斯卡拉)

scala按类型过滤

java - WatchService 在集成测试中不起作用

apache-spark - 如何在不执行 Spark SQL 表达式的情况下验证它?

scala - 如何使用申请功能申请

multithreading - Play 框架 future 没有被默认调度程序并行化

java - spark与kafka集成,Spark异常-提交jar