scala - 如何使用 udf 将空列添加到 Spark 中的复杂数组结构

标签 scala apache-spark user-defined-functions

我正在尝试将空列添加到嵌入数组[struct]列,通过这种方式我将能够转换类似的复杂列:

  case class Additional(id: String, item_value: String)
  case class Element(income:String,currency:String,additional: Additional)
  case class Additional2(id: String, item_value: String, extra2: String)
  case class Element2(income:String,currency:String,additional: Additional2)

  val  my_uDF = fx.udf((data: Seq[Element]) => {
    data.map(x=>new Element2(x.income,x.currency,new Additional2(x.additional.id,x.additional.item_value,null))).seq
  })
  sparkSession.sqlContext.udf.register("transformElements",my_uDF)
  val result=sparkSession.sqlContext.sql("select transformElements(myElements),line_number,country,idate from entity where line_number='1'")

目标是向 Element.Additional 添加一个名为 extra2 的额外字段,因此我使用 UDF 映射该字段,但它失败了,因为:

org.apache.spark.SparkException: Failed to execute user defined function(anonfun$1: (array<struct<income:string,currency:string,additional:struct<id:string,item_value:string>>>) => array<struct<income:string,currency:string,additional:struct<id:string,item_value:string,extra2:string>>>)

如果我打印“元素”字段的架构,则会显示:

 |-- myElements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)

我正在尝试转换成这个模式:

 |-- myElements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)
 |    |    |    |-- extra2: string (nullable = true)

最佳答案

这是另一种方法,利用数据集而不是数据帧来实现对对象的直接访问,而不是使用 Row。有一种名为 asElement2 的额外方法,可将 Element 转换为 Element2

case class Additional2(id: String, item_value: String, extra2: String)
case class Element2(income: String, currency: String, additional2: Additional2)

case class Additional(id: String, item_value: String)
case class Element(income:String, currency:String, additional: Additional){
  def asElement2(): Element2 ={
    val additional2 = Additional2(additional.id, additional.item_value, null)
    Element2(income, currency, additional2)
  }
}

val df = Seq(
  (Seq(Element("150000", "EUR", Additional("001", "500EUR")))),
  (Seq(Element("50000", "CHF", Additional("002", "1000CHF"))))
).toDS()

df.map{
  se => se.map{_.asElement2} 
}

//or even simpler
df.map{_.map{_.asElement2}}

输出:

+-------------------------------+
|value                          |
+-------------------------------+
|[[150000, EUR, [001, 500EUR,]]]|
|[[50000, CHF, [002, 1000CHF,]]]|
+-------------------------------+

最终架构:

root
 |-- value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional2: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)
 |    |    |    |-- extra2: string (nullable = true)

关于scala - 如何使用 udf 将空列添加到 Spark 中的复杂数组结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56942683/

相关文章:

java - Spark数据集获取与整数列标题相同的数据

scala - akka.io 在 2.3 中逐行接收

java - 如何通过 Yarn、Hadoop 提交 Spark scala 作业

python - 如何在spark中将一个拆分为多个?

apache-spark - 在另一个数据框的UDF中时如何引用数据框?

excel - 用户定义的函数,它接受 Excel VBA 中的连续和不连续范围

php - 如何在 PHP 中使用 MySQL 用户定义的函数?

scala - Scala 中的匿名类型

scala - 未检测到主类

scala - 谁能在 scala 中向我解释这段代码?