scala - Apache Spark 使用管道分隔的 CSV 文件

标签 scala apache-spark apache-spark-sql

我对 Apache Spark 非常陌生,并且正在尝试将 SchemaRDD 与我的管道分隔文本文件一起使用。我在 Mac 上使用 Scala 10 独立安装了 Spark 1.5.2。我有一个包含以下代表性数据的 CSV 文件,我试图根据记录的第一个值(列)将以下内容拆分为 4 个不同的文件.我非常感谢我能得到的任何帮助。

1|1.8|20140801T081137|115810740
2|20140714T060000|335|22159892|3657|0.00|||181
2|20140714T061500|335|22159892|3657|0.00|||157
2|20140714T063000|335|22159892|3657|0.00|||156
2|20140714T064500|335|22159892|3657|0.00|||66
2|20140714T070000|335|22159892|3657|0.01|||633
2|20140714T071500|335|22159892|3657|0.01|||1087
3|34|Starz
3|35|VH1
3|36|CSPAN: Cable Satellite Public Affairs Network
3|37|Encore
3|278|CMT: Country Music Television
3|281|Telehit
4|625363|1852400|Matlock|9212|The Divorce
4|625719|1852400|Matlock|16|The Rat Pack
4|625849|1846952|Smallville|43|Calling

最佳答案

注意:您的 csv 文件每行中的字段数不同 - 这无法按原样解析为 DataFrame。 (SchemaRDD 已重命名为 DataFrame。)如果您的 csv 文件格式正确,您可以执行以下操作:

使用 --packages com.databricks:spark-csv_2.10:1.3.0 启动 spark-shell 或 spark-submit 以便轻松解析 csv 文件( see here )。在 Scala 中,您的代码将是,假设您的 csv 文件有一个标题 - 如果是,则更容易引用列:

val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", '|').load("/path/to/file.csv")
// assume 1st column has name col1
val df1 = df.filter( df("col1") === 1)  // 1st DataFrame
val df2 = df.filter( df("col1") === 2)  // 2nd DataFrame  etc... 

由于您的文件格式不正确,您必须以不同方式解析每一行,例如,执行以下操作:
val lines = sc.textFile("/path/to/file.csv")

case class RowRecord1( col1:Int, col2:Double, col3:String, col4:Int)
def parseRowRecord1( arr:Array[String]) = RowRecord1( arr(0).toInt, arr(1).toDouble, arr(2), arr(3).toInt)

case class RowRecord2( col1:Int, col2:String, col3:Int, col4:Int, col5:Int, col6:Double, col7:Int)
def parseRowRecord2( arr:Array[String]) = RowRecord2( arr(0).toInt, arr(1), arr(2).toInt, arr(3).toInt, arr(4).toInt, arr(5).toDouble, arr(8).toInt)

val df1 = lines.filter(_.startsWith("1")).map( _.split('|')).map( arr => parseRowRecord1( arr )).toDF
val df2 = lines.filter(_.startsWith("2")).map( _.split('|')).map( arr => parseRowRecord2( arr )).toDF

关于scala - Apache Spark 使用管道分隔的 CSV 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34246499/

相关文章:

scala - Akka Streams Unzip/Zip 是否保留顺序?

apache-spark - spark.conf.set ("spark.driver.maxResultSize", '6g' ) 没有更新默认值 - PySpark

apache-spark - 为什么在完成作业和关闭 Spark 之间会发生磁盘繁忙尖峰?

apache-spark - 进行更新插入时,Spark Hudi Job 中的记录键中超过 1 列

java - 如何在scala中使用java代理

scala - Spark : write Paquet from heterogeneous data

scala - 为什么scala.collection中的traits可以创建实例?

scala - 使用 Spark 和 IntelliJ 时出现 NoSuchMethodError

java - 使用 Marathon 运行 Spark 作业

java - 如何在不使用 Spark SQL 的情况下对 Spark 中的数据帧进行排序?