csv - Spark SQL如何读取压缩的csv文件?

标签 csv apache-spark apache-spark-sql

我尝试使用 API spark.read.csv 读取扩展名为 bzgzip 的压缩 csv 文件。有效。但在源代码中我没有找到任何可以声明 codec 类型的选项参数。

即使在这个 link ,写入端只有codec的设置。谁能告诉我或提供显示 Spark 2.x 版本如何处理压缩的 csv 文件的源代码路径。

最佳答案

所有与文本相关的数据源,包括 CSVDataSource ,使用Hadoop File API来处理文件(Spark Core的RDD中也有)。

您可以在 readFile 中找到相关行这导致 HadoopFileLinesReader其中有以下几行:

val fileSplit = new FileSplit(
  new Path(new URI(file.filePath)),
  file.start,
  file.length,
  // TODO: Implement Locality
  Array.empty)

使用 Hadoop 的 org.apache.hadoop.fs.Path处理底层文件的压缩。

<小时/>

快速谷歌搜索后,我找到了处理压缩的 Hadoop 属性,即 mapreduce.output.fileoutputformat.compress

这让我找到了 Spark SQL 的 CompressionCodecs使用以下压缩配置:

"none" -> null,
"uncompressed" -> null,
"bzip2" -> classOf[BZip2Codec].getName,
"deflate" -> classOf[DeflateCodec].getName,
"gzip" -> classOf[GzipCodec].getName,
"lz4" -> classOf[Lz4Codec].getName,
"snappy" -> classOf[SnappyCodec].getName)

在下面的代码中,您可以找到 setCodecConfiguration使用“我们的”选项。

  def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
    if (codec != null) {
      conf.set("mapreduce.output.fileoutputformat.compress", "true")
      conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
      conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
      conf.set("mapreduce.map.output.compress", "true")
      conf.set("mapreduce.map.output.compress.codec", codec)
    } else {
      // This infers the option `compression` is set to `uncompressed` or `none`.
      conf.set("mapreduce.output.fileoutputformat.compress", "false")
      conf.set("mapreduce.map.output.compress", "false")
    }
  }

另一种方法getCodecClassName用于解析JSON压缩选项, CSV ,和text格式。

关于csv - Spark SQL如何读取压缩的csv文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44807144/

相关文章:

scala - 在 Spark-Scala 中将 Dataset[Row] 转换为 RDD[Array[String]] 的最佳方法?

linux - 如何使用 bash 在 linux 中用空格替换 csv 文件中的第三个逗号?

c# - 如何从逗号分隔的文件中读取值?

python - 为什么第二个for循环只迭代一次?

apache-spark - 用 pyspark 用以前已知的好值填充 null

apache-spark - 如何知道 Spark 集群 'participate' 中的机器是否有作业

java - 在 Spark UDF JAVA 中传递额外变量

apache-spark - 间隔 30 天转换为间隔 4 周 2 天

mysql - 将包含时间数据的 CSV 导入 MySQL 时,为什么会收到 '1292 Truncated' 警告?

apache-spark - FileNotFoundException : Spark save fails. 无法从数据集 [T] avro 清除缓存