compression - Apache Spark 中的 Zip 支持

标签 compression zip apache-spark

我已阅读有关 Sparkgzip 类型输入文件的支持 here ,我想知道是否对不同类型的压缩文件(例如 .zip 文件)存在相同的支持。到目前为止,我已经尝试计算压缩在 zip 文件下的文件,但 Spark 似乎无法成功读取其内容。

我已经查看了HadoopnewAPIHadoopFilenewAPIHadoopRDD,但到目前为止我还没有能够让任何东西工作。

此外,Spark支持为指定文件夹下的每个文件创建分区,如下例所示:

SparkConf SpkCnf = new SparkConf().setAppName("SparkApp")
                                  .setMaster("local[4]");

JavaSparkContext Ctx = new JavaSparkContext(SpkCnf);

JavaRDD<String> FirstRDD = Ctx.textFile("C:\input\).cache();

其中C:\input\指向包含多个文件的目录。

如果可以计算压缩文件,是否也可以将每个文件打包在一个压缩文件下,并遵循每个文件一个分区?

最佳答案

Spark默认支持压缩文件

根据Spark Programming Guide

All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").

这可以通过提供有关 Hadoop 支持哪些压缩格式的信息来扩展,基本上可以通过查找扩展 CompressionCodec 的所有类来检查 ( docs )

name    | ext      | codec class
-------------------------------------------------------------
bzip2   | .bz2     | org.apache.hadoop.io.compress.BZip2Codec 
default | .deflate | org.apache.hadoop.io.compress.DefaultCodec 
deflate | .deflate | org.apache.hadoop.io.compress.DeflateCodec 
gzip    | .gz      | org.apache.hadoop.io.compress.GzipCodec 
lz4     | .lz4     | org.apache.hadoop.io.compress.Lz4Codec 
snappy  | .snappy  | org.apache.hadoop.io.compress.SnappyCodec

来源:List the available hadoop codecs

因此,只需调用以下格式即可实现上述格式和更多可能性:

sc.readFile(path)

在 Spark 中读取 zip 文件

不幸的是,默认情况下,zip 不在受支持的列表中。

我发现了一篇很棒的文章:Hadoop: Processing ZIP files in Map/Reduce以及一些答案 ( example ) 解释如何将导入的 ZipFileInputFormat 与 sc.newAPIHadoopFile API 一起使用。但这对我不起作用

我的解决方案

没有任何外部依赖项,您可以使用 sc.binaryFiles 加载文件,然后解压缩 PortableDataStream 读取内容。这就是我选择的方法。

import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.ZipInputStream
import org.apache.spark.SparkContext
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD

implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {

    def readFile(path: String,
                 minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {

      if (path.endsWith(".zip")) {
        sc.binaryFiles(path, minPartitions)
          .flatMap { case (name: String, content: PortableDataStream) =>
            val zis = new ZipInputStream(content.open)
            // this solution works only for single file in the zip
            val entry = zis.getNextEntry
            val br = new BufferedReader(new InputStreamReader(zis))
            Stream.continually(br.readLine()).takeWhile(_ != null)
          }
      } else {
        sc.textFile(path, minPartitions)
      }
    }
  }

使用这个隐式类,您需要导入它并调用readFile SparkContext 上的方法:

import com.github.atais.spark.Implicits.ZipSparkContext
sc.readFile(path)

隐式类将正确加载您的 zip 文件并像以前一样返回 RDD[String]

注意:这只适用于 zip 存档中的单个文件!
对于 zip 支持中的多个文件,请检查此答案:https://stackoverflow.com/a/45958458/1549135

关于compression - Apache Spark 中的 Zip 支持,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28969757/

相关文章:

excel - vba,如何压缩带有特殊字符的文件?

asp.net - 使用 Asp.Net 进行 CSS/JS GZip 压缩

javascript - 如何提取 zip 文件并呈现内容,客户端

apache-spark - 'replaceWhere'会导致删除吗?

scala - 如何从 Scala 方法创建 UDF(计算 md5)?

amazon-web-services - 从本地连接到客户端 cassandra

c# - 在 C# 中压缩和解压缩文件夹

java - 如何使用 spring boot 压缩下载的文件

php - 无法创建 ZIP 存档,但没有错误消息

C++ 将整数的大小压缩到 2 位?