scala - [Scala][Spark] : transform a column in dataframe, 保留其他列,使用 withColumn 和 map [错误:缺少参数类型]

标签 scala apache-spark types functional-programming explode

我有数据框 df

|            name| languagesAtSchool|currentState|
+----------------+------------------+------------+
|    James,,Smith|[Java, Scala, C++]|          CA|
|   Michael,Rose,|[Spark, Java, C++]|          NJ|
|Robert,,Williams|   [CSharp, VB, R]|          NV|
+----------------+------------------+------------+

我要


+----------------+--------+-----+
|Name            |language|State|
+----------------+--------+-----+
|James,,Smith    |Java    |CA   |
|James,,Smith    |Scala   |CA   |
|James,,Smith    |C++     |CA   |
|Michael,Rose,   |Spark   |NJ   |
|Michael,Rose,   |Java    |NJ   |
|Michael,Rose,   |C++     |NJ   |
|Robert,,Williams|CSharp  |NV   |
|Robert,,Williams|VB      |NV   |
|Robert,,Williams|R       |NV   |
+----------------+--------+-----+

我已经尝试过以下方法,效果非常好

val df2=df.flatMap(f=> f.getSeq[String](1).map((f.getString(0),_,f.getString(2))))
    .toDF("Name","language","State")

但我希望某些东西可以在不指定其他列的情况下保留,因此我尝试了

val df2 = df.withColumn("laguage", df.flatMap(f=>f.getSeq[String](1)))

然后它给出

Unknown Error: <console>:40: error: missing parameter type
       val df3 = df.withColumn("laguage", df.flatMap(f=>f.getSeq[String](1)))
                                                     ^

因此,我希望 Spark 中的某些内容可以转换列而不丢弃其他列。 我猜原因是 scala 无法确定类型,但我无法修复它。 我是 Scala 新手,感谢您的帮助!

最佳答案

explode 正是针对这种情况 - 它拆分数组列,因此列表中的每个元素都将位于单独的行中。

这是一个带有输出的完整示例:

package org.example

import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, explode}
import org.apache.spark.sql.types.{ArrayType, StringType, StructType}


object App {

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
    import spark.implicits._

    // create dataframe with test data
    val data = Seq(
      Row("James,,Smith", List("java", "scala"), "ca"),
      Row("Robert,,Williams", List("c", "c++"), "nv")
    )

    val schema = new StructType()
      .add("name", StringType)
      .add("languages", ArrayType(StringType))
      .add("current_state", StringType)

    val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

    df.show(false)
//    +----------------+-------------+-------------+
//    |name            |languages    |current_state|
//    +----------------+-------------+-------------+
//    |James,,Smith    |[java, scala]|ca           |
//    |Robert,,Williams|[c, c++]     |nv           |
//    +----------------+-------------+-------------+

    // use explode to split the array values into different rows
    df.withColumn("language", explode(col("languages"))).drop("languages").show()

//    +----------------+-------------+--------+
//    |            name|current_state|language|
//    +----------------+-------------+--------+
//    |    James,,Smith|           ca|    java|
//    |    James,,Smith|           ca|   scala|
//    |Robert,,Williams|           nv|       c|
//    |Robert,,Williams|           nv|     c++|
//    +----------------+-------------+--------+

  }
}

关于scala - [Scala][Spark] : transform a column in dataframe, 保留其他列,使用 withColumn 和 map [错误:缺少参数类型],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69526132/

相关文章:

Java/Scala 获取两个 DateTime 对象之间差异的最佳方法作为 DateTime 列表

scala - 如何在 Scala 中找到两个日期时间之间的时差?

scala - 如何基于函数的type参数的类型参数编写具有多态返回类型的函数?

scala - Spark : Would a dataframe repartitioned to one node experience a shuffle when a groupBy is called on it?

C++ 错误 : "member Engine::x is not a type name"

c++ - __u8 和 uint8_t 之间的区别

c# - 将 C# 数据类型参数传递给用 C++ 编写的 dll?

java - 提取 DBPedia Dump 期间 DBPedia 提取框架失败

apache-spark - 在 Spark 中比较执行器之间的数据

scala - java.lang.IllegalArgumentException:无法解析查询:{“query”: