scala - Spark,Scala-列类型确定

标签 scala apache-spark

我可以从数据库中加载数据,并对这些数据进行一些处理。
问题是某些表的日期列为“字符串”,而另一些表将其列为“时间戳”。

在加载数据之前,我不知道日期列的类型。

> x.getAs[String]("date") // could be error when date column is timestamp type
> x.getAs[Timestamp]("date") // could be error when date column is string type

这就是我从Spark加载数据的方式。
spark.read
              .format("jdbc")
              .option("url", url)
              .option("dbtable", table)
              .option("user", user)
              .option("password", password)
              .load()

有什么办法可以使他们在一起吗?或始终将其转换为字符串?

最佳答案

您可以对列的类型进行模式匹配(使用DataFrame的模式),以决定是将String解析为Timestamp还是仅按原样使用Timestamp-并使用unix_timestamp函数进行实际的转换:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType

// preparing some example data - df1 with String type and df2 with Timestamp type
val df1 = Seq(("a", "2016-02-01"), ("b", "2016-02-02")).toDF("key", "date")
val df2 = Seq(
  ("a", new Timestamp(new SimpleDateFormat("yyyy-MM-dd").parse("2016-02-01").getTime)),
  ("b", new Timestamp(new SimpleDateFormat("yyyy-MM-dd").parse("2016-02-02").getTime))
).toDF("key", "date")

// If column is String, converts it to Timestamp
def normalizeDate(df: DataFrame): DataFrame = {
  df.schema("date").dataType match {
    case StringType => df.withColumn("date", unix_timestamp($"date", "yyyy-MM-dd").cast("timestamp"))
    case _ => df
  }
}

// after "normalizing", you can assume date has Timestamp type - 
// both would print the same thing:
normalizeDate(df1).rdd.map(r => r.getAs[Timestamp]("date")).foreach(println)
normalizeDate(df2).rdd.map(r => r.getAs[Timestamp]("date")).foreach(println)

关于scala - Spark,Scala-列类型确定,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41341724/

相关文章:

apache-spark - Spark UDF中如何设置小数返回类型的精度和小数位数?

java - 如何使用java在Spark中并行处理文件的每一行?

scala - 根据一个 RDD 中的键过滤另一个 RDD

sql - where子句在spark sql数据框中不起作用

scala - 在 Scala 中获取随机元素的有效方法?

java - AWS EMR java sdk - withKeepJobFlowAliveWhenNoSteps

apache-spark - 通配符在 pyspark 数据框中不起作用

apache-spark - 如何查找 Spark 列是否包含特定值?

java - AWS EMR集群中的spark-submit “Error: No main class set in JAR”

json - 提高聚合跨多个 JSON 行的键值的性能