scala - 如何使用 Scala(spark) 逐行读取文本文件并使用分隔符拆分并将值存储在各个列中?

标签 scala apache-spark

<分区>

我是 Scala 新手。

我的要求是我需要逐行读取并将其拆分为特定的分隔符并提取值以放入不同文件的相应列中。

下面是我的输入示例数据:

ABC Log

Aug 10 14:36:52 127.0.0.1 CEF:0|McAfee|ePolicy Orchestrator|IFSSLCRT0.5.0.5/epo4.0|2410|DeploymentTask|High  eventId=34 externalId=23
Aug 10 15:45:56 127.0.0.1 CEF:0|McAfee|ePolicy Orchestrator|IFSSLCRT0.5.0.5/epo4.0|2890|DeploymentTask|Medium eventId=888 externalId=7788
Aug 10 16:40:59 127.0.0.1 CEF:0|NV|ePolicy Orchestrator|IFSSLCRT0.5.0.5/epo4.0|2990|DeploymentTask|Low eventId=989 externalId=0004


XYZ Log

Aug 15 14:32:15 142.101.36.118 cef[10612]: CEF:0|fire|cc|3.5.1|FireEye Acquisition Started
Aug 16 16:45:10 142.101.36.189 cef[10612]: CEF:0|cold|dd|3.5.4|FireEye Acquisition Started
Aug 18 19:50:20 142.101.36.190 cef[10612]: CEF:0|fire|ee|3.5.6|FireEye Acquisition Started

在上面的数据中,我需要阅读“ABC 日志”标题下的第一部分,并从每一行中提取值并将其放在相应的列下。这里有几个第一个值的列名称是硬编码的,最后一列我需要通过拆分来提取“="即 eventId=34 externalId=23 => col = eventId value = 34 和 col = value = externalId

Column names 

date time ip_address col1 col2 col3 col4 col5

我想要如下输出:

这是第一部分“ABC 日志”并将其放入一个文件中,其余部分相同。

 date    time     ip_address  col1   col2    col3          col4      col5 col6                            col7  
 Aug 10  14:36:52 127.0.0.1   CEF:0  McAfee   ePolicy Orchestrator IFSSLCRT0.5.0.5/epo4.0 2410 DeploymentTask High

Aug 10 15:45:56 127.0.0.1 CEF:0 McAfee ePolicy Orchestrator IFSSLCRT0.5.0.5/epo4.0 2890 DeploymentTask Medium

下面的代码我一直在尝试:

package AV_POC_Parsing
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.Logger

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

// For implicit conversions like converting RDDs to DataFrames

//import org.apache.spark.implicits._

//import spark.implicits._


object scala {

   def main(args: Array[String]) {

  // create Spark context with Spark configuration
    val sc = new SparkContext(new SparkConf().setAppName("AV_Log_Processing").setMaster("local[*]"))

    // Read text file in spark RDD 

    val textFile = sc.textFile("input.txt");


    val splitRdd = textFile.map( line => line.split(" "))
    // RDD[ Array[ String ]


    // printing values
    splitRdd.foreach { x => x.foreach { y => println(y) } }

   // how to store split values in different column and write it into file

}}

如何在 Scala 中拆分两个分隔符。

谢谢

最佳答案

也许对你有帮助。

import org.apache.spark.{SparkConf, SparkContext}

object DataFilter {

  def main(args: Array[String]): Unit = {

    // create Spark context with Spark configuration
    val sc = new SparkContext(new SparkConf().setAppName("AV_Log_Processing").setMaster("local[*]"))

    // Read text file in spark RDD
    val textFile = sc.textFile("input.txt");

    val splitRdd = textFile.map { s =>
      val a = s.split("[ |]")
      val date = Array(a(0) + " " + a(1))
      (date ++ a.takeRight(10)).mkString("\t")
    }
    // RDD[ Array[ String ]


    // printing values
    splitRdd.foreach(println)

    // how to store split values in different column and write it into file
  }
}

关于scala - 如何使用 Scala(spark) 逐行读取文本文件并使用分隔符拆分并将值存储在各个列中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46345620/

相关文章:

scala - Intellij IDEA : Why build. sbt 以红色突出显示?

scala - Sbt:如何为所有项目定义任务?

python - 当列表值与 Pyspark 数据框中列值的子字符串匹配时填充新列

java - 运行spark程序出错 "java.lang.IllegalArgumentException: Unsupported type: com.spark.example.main.App$Product"

scala - Intellij Idea 和 Play Framework 中的库导入问题

scala - 编解码器解码/编码分割长度字段

scala - 我们如何对 Spark RDD 中的数据进行排序和分组?

apache-spark - 在 YARN 上以集群模式运行 Spark 应用程序时不使用 SparkConf 设置

scala - Spark scala 将 Unix 时间转换为时间戳失败

apache-spark - 为推荐引擎建模隐式和显式行为数据