scala - 更改多个 Spark DataFrame 列类型,动态且可配置

标签 scala apache-spark-sql

我是 Spark 和 Scala 的新手。

我们有一个外部数据源为我们提供 JSON。此 JSON 包含所有值的引号,包括数字和 bool 字段。所以当我将它放入我的 DataFrame 时,所有列都是字符串。最终目标是将这些 JSON 记录转换为正确类型的 Parquet 文件。

大约有 100 个字段,我需要将其中几个类型从字符串更改为 int、boolean 或 bigint(long)。此外,我们处理的每个 DataFrame 将只有这些字段的一个子集,而不是所有字段。因此,我需要能够处理给定 DataFrame 的列子集,将每一列与已知的列类型列表进行比较,并根据 DataFrame 中出现的列将某些列从字符串转换为 int、bigint 和 boolean。

最后,我需要列类型列表是可配置的,因为我们将来会有新的列,并且可能想要删除或更改旧列。

所以,这是我目前所拥有的:

// first I convert to all lower case for column names
val df = dfIn.toDF(dfIn.columns map(_.toLowerCase): _*)

// Big mapping to change types
// TODO how would I make this configurable?
// I'd like to drive this list from an external config file.
val dfOut = df.select(
   df.columns.map {

     ///// Boolean
     case a @ "a" => df(a).cast(BooleanType).as(a)
     case b @ "b" => df(b).cast(BooleanType).as(b)

     ///// Integer
     case i @ "i" => df(i).cast(IntegerType).as(i)
     case j @ "j" => df(j).cast(IntegerType).as(j)


     // Bigint to Double
     case x @ "x" => df(x).cast(DoubleType).as(x)
     case y @ "y" => df(y).cast(DoubleType).as(y)

     case other         => df(other)
   }: _*
)

这是将这些数据转换为我在 Scala 中想要的类型的一种有效方式吗?

我可以使用一些关于如何从我可以定义列类型的外部“配置”文件中解决这个问题的建议。

最佳答案

我的问题演变成了这个问题。那里给出了很好的答案:

Spark 2.2 Scala DataFrame select from string array, catching errors

关于scala - 更改多个 Spark DataFrame 列类型,动态且可配置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47520650/

相关文章:

scala - 如何从 AnyRef 对象获取反射的运行时方法?

scala - 提升 RewriteResponse 未找到有效的 url

scala - 在 Amazon Keyspaces 上的 Cassandra 表中写入数据帧时出错

scala - scala.js 示例应用程序中的 "Cannot find an implicit ExecutionContext"错误。

java - 在 Spark SQL 中加载 JDBC 表时数据不正确

apache-spark - Spark SQL 中的共同分区联接

hive - 指定分区时,Spark SQL saveAsTable 与 Hive 不兼容

Scala 隐式转换在某些条件下应用,但在其他条件下不应用

apache-spark - 如何使用 Dataset API 使用序数(例如 SQL 的 'GROUP BY 1' 或 'ORDER BY 2' )?

apache-spark - 写Delta Lake时使用分区(配合partitionBy)没有效果