我正在尝试读取 CSV 文件并构建数据框。
CSV的格式如下。我使用 ISO8602 日期/时间格式来表示数据/时间字符串。
2015-6-29T12:0:0,b82debd63cffb1490f8c9c647ca97845,G1J8RX22EGKP,2015-6-29T12:0:5,2015-6-29T12:0:6,0QA97RAM1GIV,2015-6-29T12:0:10,2015-6-29T12:0:11,2015-6-29T12:0:12,2015-6-29T12:5:42,1
2015-6-29T12:20:0,0d60c871bd9180275f1e4104d4b7ded0,5HNB7QZSUI2C,2015-6-29T12:20:5,2015-6-29T12:20:6,KSL2LB0R6367,2015-6-29T12:20:10,2015-6-29T12:20:11,2015-6-29T12:20:12,2015-6-29T12:25:13,1
......
为了加载这些数据,我在 Zeppelin 中编写了如下的 scala 代码
import org.apache.spark.sql.types.DateType
import org.apache.spark.sql.functions._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import sys.process._
val logCSV = sc.textFile ("log_table.csv")
case class record(
callingTime:DateTime,
userID:String,
CID:String,
serverConnectionTime:DateTime,
serverResponseTime:DateTime,
connectedAgentID:String,
beginCallingTime:DateTime,
endCallingTime:DateTime,
Succeed:Int)
val formatter = DateTimeFormat.forPattern("yyyy-mm-dd'T'kk:mm:ss")
val logTable = logCSV.map(s => s.split(",") ).map(
s => record(
formatter.parseDateTime( s(0) ),
s(1),
s(2),
formatter.parseDateTime( s(3) ),
formatter.parseDateTime( s(4) ),
s(5),
formatter.parseDateTime( s(6) ),
formatter.parseDateTime( s(7) ),
s(8).toInt
)
).toDF()
它犯了如下错误。主要问题是 DateTime 不可序列化。
logCSV: org.apache.spark.rdd.RDD[String] = log_table.csv MapPartitionsRDD[38] at textFile at <console>:169
defined class record
formatter: org.joda.time.format.DateTimeFormatter = org.joda.time.format.DateTimeFormatter@46051d99
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.map(RDD.scala:286)
然后我想知道如何在 Scala 中处理日期/时间信息。你可以帮帮我吗?
最佳答案
虽然DateTime如果您使用 parseMillis 则不可序列化方法DateTimeFormatter ,您将得到一个 long,它是免费桥接到 Long(可序列化)的。要从 Long 返回 DateTime,请使用 DateTime(longInstance.longValue())
构造函数。
关于scala - 在 Zeppelin 和 Spark 中解析 CSV 中的日期时间信息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31131284/