scala - 连接 DataFrame 列的数组元素

标签 scala apache-spark apache-spark-sql

我有一个 Dataframe df1,格式如下:

+--------------------------+
|DateInfos                 |
+--------------------------+
|[[3, A, 111], [4, B, 222]]|
|[[1, C, 333], [2, D, 444]]|
|[[5, E, 555]]             |
+--------------------------+

我想用分隔符“-”(df2)连接每个 tuples3 的第二个和第三个元素:

+------------------------+
|DateInfos               |
+------------------------+
|[[3, A-111], [4, B-222]]|
|[[1, C-333], [2, D-444]]|
|[[5, E-555]]            |
+------------------------+

我打印 df1 的架构:

root
 |-- DateInfos: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: string (nullable = true)
 |    |    |-- _3: string (nullable = true)

我假设我必须创建一个 udf,它使用具有以下签名的函数:

def concatDF1(array: Array[(Int, String, String)]): Array[(Int, String)] = {
   val res = Array.map(elem => (elem._1, elem._2 + "-" + elem._3)).toArray
   res
}

我执行这样的方法:

val concat_udf = sqlContext.udf.register("concat_udf", concat _)
val df2_temp = df1.withColumn("DataInfos_temp",concat_udf(df1("DataInfos")))
val df2 = df2_temp.drop("DataInfos").withColumnRenamed("DataInfos_temp", "DataInfos")

我收到此错误:

Caused by: org.apache.spark.SparkException: Failed to execute user defined function(anonfun$4: (array<struct<_1:int,_2:string,_3:string>>) => array<struct<_1:int,_2:string>>)

你有什么想法吗?

最佳答案

这应该可以完成工作:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

val sparkSession = ...
import sparkSession.implicits._

val input = sc.parallelize(Seq(
  Seq((3, "A", 111), (4, "B", 222)),
  Seq((1, "C", 333), (2, "D", 444)),
  Seq((5, "E", 555))
)).toDF("DateInfos")

val concatElems = udf { seq: Seq[Row] =>
  seq.map { case Row(x: Int, y: String, z: Int) => 
    (x, s"$y-$z")
  }
}

val output = input.select(concatElems($"DateInfos").as("DateInfos"))

output.show(truncate = false)

哪些输出:

+----------------------+
|DateInfos             |
+----------------------+
|[[3,A-111], [4,B-222]]|
|[[1,C-333], [2,D-444]]|
|[[5,E-555]]           |
+----------------------+

关于scala - 连接 DataFrame 列的数组元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40576135/

相关文章:

hadoop - 并发追加到Spark中的hdfs文件

python - 在 pyspark 中找不到 col 函数

scala - 在窗口 apache Spark 中将滞后与行计算相结合

java - 使用 Spring 作为 play 2.4.x 的依赖注入(inject)框架?

apache-spark - ALS 模型 - 如何生成 full_u * v^t * v?

scala - 转发集合的最佳实践

scala - 从mapValues或flatMapValues访问 key ?

scala - 如何获取数据框中每一行每一列的值和类型?

scala - 初始化val可能会引发异常

Scala IDE运行出现错误 "An error has occured. See the log file ...\.metaspace\.log."