scala - 如何在 Spark 中创建 Schema 文件

标签 scala apache-spark-sql schema orc

我正在尝试读取一个架构文件(它是一个文本文件)并将其应用于我的 CSV 文件而没有标题。由于我已经有一个架构文件,我不想使用 InferSchema这是一个开销的选项。

我的输入架构文件如下所示,

"num IntegerType","letter StringType"

我正在尝试使用以下代码来创建架构文件,
val schema_file = spark.read.textFile("D:\\Users\\Documents\\schemaFile.txt")
val struct_type = schema_file.flatMap(x => x.split(",")).map(b => (b.split(" ")(0).stripPrefix("\"").asInstanceOf[String],b.split(" ")(1).stripSuffix("\"").asInstanceOf[org.apache.spark.sql.types.DataType])).foreach(x=>println(x))

我收到如下错误
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.types.DataType

- 字段(类:“org.apache.spark.sql.types.DataType”,名称:“_2”)
- 根类:“scala.Tuple2”

并在使用 spark.read.csv 时尝试将其用作模式文件像下面一样,把它写成一个 ORC 文件
  val df=spark.read
      .format("org.apache.spark.csv")
      .option("header", false)
      .option("inferSchema", true)
      .option("samplingRatio",0.01)
      .option("nullValue", "NULL")
      .option("delimiter","|")
      .schema(schema_file)
      .csv("D:\\Users\\sampleFile.txt")
      .toDF().write.format("orc").save("D:\\Users\\ORC")

需要帮助将文本文件转换为架构文件并将我的输入 CSV 文件转换为 ORC。

最佳答案

text 创建模式文件创建函数到match type并返回 DataType作为

def getType(raw: String): DataType = {
  raw match {
    case "ByteType" => ByteType
    case "ShortType" => ShortType
    case "IntegerType" => IntegerType
    case "LongType" => LongType
    case "FloatType" => FloatType
    case "DoubleType" => DoubleType
    case "BooleanType" => BooleanType
    case "TimestampType" => TimestampType
    case _ => StringType
  }
}

现在通过读取架构文件来创建架构
val schema = Source.fromFile("schema.txt").getLines().toList
  .flatMap(_.split(",")).map(_.replaceAll("\"", "").split(" "))
  .map(x => StructField(x(0), getType(x(1)), true))

现在将 csv 文件读取为
spark.read
  .option("samplingRatio", "0.01")
  .option("delimiter", "|")
  .option("nullValue", "NULL")
  .schema(StructType(schema))
  .csv("data.csv")

希望这可以帮助!

关于scala - 如何在 Spark 中创建 Schema 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50500804/

相关文章:

具有自定义数据格式的 JavaFX DragAndDrop

mysql - 为什么需要 MySQL 数据库架构?

node.js - mongodb mongoose nodejsexpress4 为什么会自动在对象数组字段中插入_id?

java - maven project 和 sbt project and play framework 是否考虑共享一个存储库?

scala - SBT Assembly 从另一个 JAR 中删除类

Scala self 类型特征实例化

list - 数据框列中的嵌套列表,提取数据框列中列表的值 Pyspark Spark

apache-spark - Spark是否支持对S3中的 Parquet 文件进行真实的列扫描?

scala - Spark : Writing data frame to s3 bucket

php - 在 WooCommerce 中的产品 (Schema.org) 的结构化数据中添加 ean 代码 (gtin)