我的文件夹中有 3 个日志文件。 喜欢
foldera = emplog,deptlog,companylog
folderb = emplog,deptlog,companylog
folderc = emplog,deptlog,companylog
我有 3 个 diff scala 程序文件来从每个文件中提取数据。
employee.scala
department.scala
companylog.scala
每个代码都如下所示。
我想组合所有这些文件并以并行方式执行它们。
package com.sample
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.sql.SQLContext
import org.apache.log4j.{Level, Logger}
object logparser {
def main(args: Array[String]) = {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
//Start the Spark context
val conf = new SparkConf()
.setAppName("Parser")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext= new SQLContext(sc)
val test = sc.wholeTextFiles("C:\\mkdir\\*\\*")
.map{l =>
if(l._1.endsWith("emplog.txt")){
empparser(l._2,sc,sqlContext)
}
l
}
.foreach{println}
}
def empparser(record:String,sc:SparkContext,sqlContext:SQLContext) = {
val emppattern="""[(](\d+)[)]\s([\w\s._]{30})\s+""".r
import sqlContext.implicits._
val indrecs = emppattern.findAllIn(record)
.map{ line =>
val emppattern(eid,ename) = line
(eid,ename)
}
.toSeq
.toDF("eid","ename")
.show()
}
}
我已经尝试过将每个方法附加到同一对象中的代码。
现在出现两个问题 Q1.当我编译时我得到
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@6b0615ae)
- field (class: com.sample.logparser$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext)
- object (class com.sample.logparser$$anonfun$1, <function1>)
据我所知(仅限新手)Spark 上下文无法序列化。如果我不传递 sc 作为参数,则会出现空指针异常。我该如何解决这个问题?
Q2:转换为 DF 后,我将在 empparser 方法中插入到 hive 表代码。一旦完成,我不想在我的主要范围内做任何事情。但除非我在那之后采取行动,否则我的 map 代码不会执行。这就是为什么我之后有 foreacch println 。有办法解决这个问题吗?
最佳答案
为了尝试回答这个问题,我假设处理员工或部门的结果会产生相同的类型记录。我希望每种数据的情况都不同,因此我单独处理不同类型的记录,以允许这种“根据现实进行调整”。
首先,我们定义一个记录案例类
和不同种类或记录类型的解析器。 (为了简单起见,我在这里复制相同的实现)
case class Record(id:String, name: String)
val empParser: String => Option[Record] = { record =>
val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r
record match {
case pattern(eid,ename) => Some(Record(eid, ename))
case _ => None
}
}
val deptParser: String => Option[Record] = { record =>
val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r
record match {
case pattern(eid,ename) => Some(Record(eid, ename))
case _ => None
}
}
val companyParser: String => Option[Record] = { record =>
val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r
record match {
case pattern(eid,ename) => Some(Record(eid, ename))
case _ => None
}
}
我们使用wholeFiles
加载数据:
val dataPath = "/.../data/wholefiles/*/*"
val logFiles = sc.wholeTextFiles(dataPath)
然后,我们通过过滤文件来处理不同类型的记录,以获得我们需要的文件类型,并应用上面定义的解析器。请注意我们实际上是如何重复相同的过程的。这可以被抽象出来。
val empLogs = logFiles.filter{case (filename, content) => filename.endsWith("emplog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> empParser(line))}
val deptLogs = logFiles.filter{case (filename, content) => filename.endsWith("deptlog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> deptParser(line))}
val compLogs = logFiles.filter{case (filename, content) => filename.endsWith("companylog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> companyParser(line))}
我们现在转换为 DataFrame
val empDF = empLogs.toDF
我们也可以对其他记录类型执行相同的操作。
这个过程中有足够的空间来减少代码重复,这取决于我们是否能在不同数据类型的过程中找到共性。
关于scala - 如何在spark中为diff文件名调用单独的逻辑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41206979/