scala - 在 Zeppelin 和 Spark 中解析 CSV 中的日期时间信息

标签 scala csv datetime apache-spark

我正在尝试读取 CSV 文件并构建数据框。

CSV的格式如下。我使用 ISO8602 日期/时间格式来表示数据/时间字符串。

2015-6-29T12:0:0,b82debd63cffb1490f8c9c647ca97845,G1J8RX22EGKP,2015-6-29T12:0:5,2015-6-29T12:0:6,0QA97RAM1GIV,2015-6-29T12:0:10,2015-6-29T12:0:11,2015-6-29T12:0:12,2015-6-29T12:5:42,1
2015-6-29T12:20:0,0d60c871bd9180275f1e4104d4b7ded0,5HNB7QZSUI2C,2015-6-29T12:20:5,2015-6-29T12:20:6,KSL2LB0R6367,2015-6-29T12:20:10,2015-6-29T12:20:11,2015-6-29T12:20:12,2015-6-29T12:25:13,1
......

为了加载这些数据,我在 Zeppelin 中编写了如下的 scala 代码

import org.apache.spark.sql.types.DateType
import org.apache.spark.sql.functions._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import sys.process._

val logCSV = sc.textFile ("log_table.csv")

case class record(
    callingTime:DateTime, 
    userID:String, 
    CID:String, 
    serverConnectionTime:DateTime, 
    serverResponseTime:DateTime, 
    connectedAgentID:String, 
    beginCallingTime:DateTime, 
    endCallingTime:DateTime, 
    Succeed:Int)


val formatter = DateTimeFormat.forPattern("yyyy-mm-dd'T'kk:mm:ss")

val logTable = logCSV.map(s => s.split(",") ).map(
    s => record(
            formatter.parseDateTime( s(0) ), 
            s(1),
            s(2),
            formatter.parseDateTime( s(3) ), 
            formatter.parseDateTime( s(4) ), 
            s(5),
            formatter.parseDateTime( s(6) ), 
            formatter.parseDateTime( s(7) ),            
            s(8).toInt
        )
).toDF()

它犯了如下错误。主要问题是 DateTime 不可序列化。

logCSV: org.apache.spark.rdd.RDD[String] = log_table.csv MapPartitionsRDD[38] at textFile at <console>:169
defined class record
formatter: org.joda.time.format.DateTimeFormatter = org.joda.time.format.DateTimeFormatter@46051d99
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
    at org.apache.spark.rdd.RDD.map(RDD.scala:286)

然后我想知道如何在 Scala 中处理日期/时间信息。你可以帮帮我吗?

最佳答案

虽然DateTime如果您使用 parseMillis 则不可序列化方法DateTimeFormatter ,您将得到一个 long,它是免费桥接到 Long(可序列化)的。要从 Long 返回 DateTime,请使用 DateTime(longInstance.longValue()) 构造函数。

关于scala - 在 Zeppelin 和 Spark 中解析 CSV 中的日期时间信息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31131284/

相关文章:

java - I/O函数式编程、java编程

scala - 通过 Spark 读取文件夹中保存的所有 Parquet 文件

reflection - Scala 枚举和反射

c# - 获取两次之间的时间,时间跨度为一小时

sql - 如何为SQL设置时区?

scala - 无法在 IntelliJ + sbt-idea-plugin 中调试 Scala 应用程序

python - 使用 DictReader 从 csv 读取特定列

r - 在 csv 文件中写入和加载表达式集

python - 从最后一行读取csv文件

java - Java系统毫秒是否考虑闰秒?