我可以从数据库中加载数据,并对这些数据进行一些处理。
问题是某些表的日期列为“字符串”,而另一些表将其列为“时间戳”。
在加载数据之前,我不知道日期列的类型。
> x.getAs[String]("date") // could be error when date column is timestamp type
> x.getAs[Timestamp]("date") // could be error when date column is string type
这就是我从Spark加载数据的方式。
spark.read
.format("jdbc")
.option("url", url)
.option("dbtable", table)
.option("user", user)
.option("password", password)
.load()
有什么办法可以使他们在一起吗?或始终将其转换为字符串?
最佳答案
您可以对列的类型进行模式匹配(使用DataFrame的模式),以决定是将String解析为Timestamp还是仅按原样使用Timestamp-并使用unix_timestamp
函数进行实际的转换:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
// preparing some example data - df1 with String type and df2 with Timestamp type
val df1 = Seq(("a", "2016-02-01"), ("b", "2016-02-02")).toDF("key", "date")
val df2 = Seq(
("a", new Timestamp(new SimpleDateFormat("yyyy-MM-dd").parse("2016-02-01").getTime)),
("b", new Timestamp(new SimpleDateFormat("yyyy-MM-dd").parse("2016-02-02").getTime))
).toDF("key", "date")
// If column is String, converts it to Timestamp
def normalizeDate(df: DataFrame): DataFrame = {
df.schema("date").dataType match {
case StringType => df.withColumn("date", unix_timestamp($"date", "yyyy-MM-dd").cast("timestamp"))
case _ => df
}
}
// after "normalizing", you can assume date has Timestamp type -
// both would print the same thing:
normalizeDate(df1).rdd.map(r => r.getAs[Timestamp]("date")).foreach(println)
normalizeDate(df2).rdd.map(r => r.getAs[Timestamp]("date")).foreach(println)
关于scala - Spark,Scala-列类型确定,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41341724/