scala - Spark-SQL : How to read a TSV or CSV file into dataframe and apply a custom schema?

标签 scala apache-spark apache-spark-sql

我在处理制表符分隔值 (TSV) 和逗号分隔值 (CSV) 文件时使用 Spark 2.0。我想将数据加载到 Spark-SQL 数据帧中,在读取文件时我想完全控制架构。我不希望 Spark 从文件中的数据猜测架构。

如何将 TSV 或 CSV 文件加载到 Spark SQL Dataframe 中并向它们应用架构?

最佳答案

下面是加载制表符分隔值 (TSV) 文件并应用架构的完整 Spark 2.0 示例。

我正在使用Iris data set in TSV format from UAH.edu举个例子。以下是该文件的前几行:

Type    PW      PL      SW      SL
0       2       14      33      50
1       24      56      31      67
1       23      51      31      69
0       2       10      36      46
1       20      52      30      65

要强制实现架构,您可以使用以下两种方法之一以编程方式构建它:

A.使用 StructType 创建架构:

import org.apache.spark.sql.types._

var irisSchema = StructType(Array(
    StructField("Type",         IntegerType, true),
    StructField("PetalWidth",   IntegerType, true),
    StructField("PetalLength",  IntegerType, true),
    StructField("SepalWidth",   IntegerType, true),
    StructField("SepalLength",  IntegerType, true)
    ))

B.或者,使用 case classEncoders 创建架构(此方法不太冗长):

import org.apache.spark.sql.Encoders

case class IrisSchema(Type: Int, PetalWidth: Int, PetalLength: Int, 
                      SepalWidth: Int, SepalLength: Int)

var irisSchema = Encoders.product[IrisSchema].schema

创建架构后,您可以使用 spark.read 读取 TSV 文件。请注意,只要正确设置 option("delimiter", d) 选项,您实际上还可以读取逗号分隔值 (CSV) 文件或任何分隔文件。此外,如果您的数据文件有标题行,请务必设置 option("header", "true")

下面是完整的最终代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Encoders

val spark = SparkSession.builder().getOrCreate()

case class IrisSchema(Type: Int, PetalWidth: Int, PetalLength: Int,
                      SepalWidth: Int, SepalLength: Int)

var irisSchema = Encoders.product[IrisSchema].schema

var irisDf = spark.read.format("csv").     // Use "csv" regardless of TSV or CSV.
                option("header", "true").  // Does the file have a header line?
                option("delimiter", "\t"). // Set delimiter to tab or comma.
                schema(irisSchema).        // Schema that was built above.
                load("iris.tsv")

irisDf.show(5)

这是输出:

scala> irisDf.show(5)
+----+----------+-----------+----------+-----------+
|Type|PetalWidth|PetalLength|SepalWidth|SepalLength|
+----+----------+-----------+----------+-----------+
|   0|         2|         14|        33|         50|
|   1|        24|         56|        31|         67|
|   1|        23|         51|        31|         69|
|   0|         2|         10|        36|         46|
|   1|        20|         52|        30|         65|
+----+----------+-----------+----------+-----------+
only showing top 5 rows

关于scala - Spark-SQL : How to read a TSV or CSV file into dataframe and apply a custom schema?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43508054/

相关文章:

apache-spark - 来自Docker的Ne​​o4j和Spark的ServiceUnavailableException

apache-spark - 在 YARN-Cluster 模式下运行时找不到 Hive 表

scala - 如何在 Scala Spark 中将稀疏向量转换为密集向量?

scala - 当返回类型为 Option[Error] 时处理快速失败失败

scala - 在 Scala json4s 中获取一个字段

scala - 克隆/深度复制 Spark 数据帧

python - 如何一次转换多个 Spark 数据框列类型?

android - 为什么 "Duplicate zip entry [classes.jar:android/support/v7/appcompat/R$styleable.class]"是 android-sdk-plugin?

java - Spark的Column.isin函数不带List

apache-spark - Databricks 的 Spark 外壳