斯卡拉 Spark : how to use dataset for a case class with the schema has snake_case?

标签 scala apache-spark apache-spark-dataset

我有以下案例类:

case class User(userId: String)

以及以下架构:

+--------------------+------------------+
|            col_name|         data_type|
+--------------------+------------------+
|             user_id|            string|
+--------------------+------------------+

当我尝试使用 spark.read.table("MyTable").as[User] 将 DataFrame 转换为类型化的 Dataset[User],我收到字段名称不匹配的错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException:
    cannot resolve ''`user_id`' given input columns: [userId];;

有没有什么简单的方法可以解决这个问题,而不破坏scala惯用语并命名我的字段user_id?当然,我的真实表有更多字段,并且我有更多案例类/表,因此为每个案例类手动定义 Encoder 是不可行的(而且我不足够了解宏,所以这是不可能的;尽管如果存在这样的宏,我很乐意使用它!)。

我觉得我错过了一个非常明显的“将snake_case转换为camelCase=true”选项,因为我使用过的几乎所有ORM中都存在这个选项。

最佳答案

scala> val df = Seq(("Eric" ,"Theodore", "Cartman"), ("Butters", "Leopold", "Stotch")).toDF.select(concat($"_1", lit(" "), ($"_2")) as "first_and_middle_name", $"_3" as "last_name")
df: org.apache.spark.sql.DataFrame = [first_and_middle_name: string, last_name: string]

scala> df.show
+---------------------+---------+
|first_and_middle_name|last_name|
+---------------------+---------+
|        Eric Theodore|  Cartman|
|      Butters Leopold|   Stotch|
+---------------------+---------+


scala> val ccnames = df.columns.map(sc => {val ccn = sc.split("_")
    | (ccn.head +: ccn.tail.map(_.capitalize)).mkString
    | })
ccnames: Array[String] = Array(firstAndMiddleName, lastName)

scala> df.toDF(ccnames: _*).show
+------------------+--------+
|firstAndMiddleName|lastName|
+------------------+--------+
|     Eric Theodore| Cartman|
|   Butters Leopold|  Stotch|
+------------------+--------+

编辑:这有帮助吗?定义一个接受 loader: String => DataFrame 和 path: String 的函数。

scala> val parquetloader = spark.read.parquet _
parquetloader: String => org.apache.spark.sql.DataFrame = <function1>

scala> val tableloader = spark.read.table _
tableloader: String => org.apache.spark.sql.DataFrame = <function1>

scala> val textloader = spark.read.text _
textloader: String => org.apache.spark.sql.DataFrame = <function1>

// csv loader and others

def snakeCaseToCamelCaseDataFrameColumns(path: String, loader: String => DataFrame): DataFrame = {
  val ccnames = loader(path).columns.map(sc => {val ccn = sc.split("_")
    (ccn.head +: ccn.tail.map(_.capitalize)).mkString
    })
  df.toDF(ccnames: _*)
}

scala> :paste
// Entering paste mode (ctrl-D to finish)

def snakeCaseToCamelCaseDataFrameColumns(path: String, loader: String => DataFrame): DataFrame = {
      val ccnames = loader(path).columns.map(sc => {val ccn = sc.split("_")
        (ccn.head +: ccn.tail.map(_.capitalize)).mkString
        })
      df.toDF(ccnames: _*)
    }

// Exiting paste mode, now interpreting.

snakeCaseToCamelCaseDataFrameColumns: (path: String, loader: String => org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame

val oneDF = snakeCaseToCamelCaseDataFrameColumns(tableloader("/path/to/table"))
val twoDF = snakeCaseToCamelCaseDataFrameColumns(parquetloader("/path/to/parquet/file"))

关于斯卡拉 Spark : how to use dataset for a case class with the schema has snake_case?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49853104/

相关文章:

按名称访问的 Scala 元素集合

Scala 用于理解 Future、List 和 Option

scala - 计算 Spark (Scala) 中数据框列中的空值

scala - Spark 错误 : Unable to find encoder for type stored in a Dataset

java - 如何将 Dataset<Tuple2<String,DeviceData>> 转换为 Iterator<DeviceData>

scala - 在 Scala REPL 中设置包

scala - 如何从cats State创建cats IO monad

java - 在 Spark JavaRDD 转换中使用可序列化的 lambda

scala - Spark - 连接后如何避免重复列?

java - 如何使用 JAVA API 在 Spark 中使用另一个具有相同架构的数据集 <Row> 的记录来更新数据集 <Row> ?