java - 在 Spark Scala 中处理微秒

标签 java scala datetime apache-spark apache-spark-sql

我使用 Scala 将 PostgreSQL 表作为数据框导入到 spark 中。数据框看起来像

user_id | log_dt  
--------| -------    
96      | 2004-10-19 10:23:54.0    
1020    | 2017-01-12 12:12:14.931652

我正在将此数据帧转换为 log_dt 的数据格式为 yyyy-MM-dd hh:mm:ss.SSSSSS。为此,我使用以下代码使用 unix_timestamp 函数将 log_dt 转换为时间戳格式。

val tablereader1 = tablereader1Df.withColumn("log_dt",unix_timestamp(tablereader1Df("log_dt"),"yyyy-MM-dd hh:mm:ss.SSSSSS").cast("timestamp"))

当我使用命令 tablereader1.show() 打印 tablereader1 数据帧时,我得到以下结果

user_id | log_dt  
--------| -------
96      | 2004-10-19 10:23:54.0
1020    | 2017-01-12 12:12:14.0

如何将微秒保留为时间戳的一部分?任何建议表示赞赏。

最佳答案

date_format() 的毫秒数

您可以使用 Spark SQL date_format()它接受 Java SimpleDateFormat模式。 SimpleDateFormat 只能解析到毫秒模式“S”

import org.apache.spark.sql.functions._
import spark.implicits._ //to use $-notation on columns

val df = tablereader1Df.withColumn("log_dt", date_format($"log_dt", "S"))

更新:微秒与 Java 8 的 LocalDateTime

//Imports
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoField;

/* //Commented as per comment about IntelliJ
spark.udf.register("date_microsec", (dt: String) => 
   val dtFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.n")
   LocalDateTime.parse(dt, dtFormatter).getLong(ChronoField.MICRO_OF_SECOND)
)
*/

import org.apache.spark.sql.functions.udf

val date_microsec = udf((dt: String) => {
    val dtFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.n")
    LocalDateTime.parse(dt, dtFormatter).getLong(ChronoField.MICRO_OF_SECOND)
})

检查:help in building DateTimeFormatter pattern

使用 ChronoField.NANO_OF_SECOND 而不是 ChronoField.MICRO_OF_SECOND 在 UDF 中获取纳秒。

val df = tablereader1Df.withColumn("log_date_microsec", date_microsec($"log_dt"))

关于java - 在 Spark Scala 中处理微秒,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41879125/

相关文章:

scala - 如何定义具有未绑定(bind)类型参数的成员的案例类?

java - 将字符串转换为 am/pm 格式的日期和时间

java - 如何重新生成损坏的 GUI 表单?

java - TFS:在某人删除并重新添加而不是移动文件后如何恢复合并能力?

performance - Scala:可变对象与不可变对象(immutable对象)性能 - OutOfMemoryError

scala - Akka http 丢失发件人引用

mysql - 时间查询 - 查询一列大于另一列

Java 8 将 UTC 时间转换为 EDT/EST 以便日期保持不变

java - 命令设计模式在此用例中的应用?

java - Java中用于存储图形的数组