我已阅读有关 Spark 对 gzip 类型输入文件的支持 here ,我想知道是否对不同类型的压缩文件(例如 .zip 文件)存在相同的支持。到目前为止,我已经尝试计算压缩在 zip 文件下的文件,但 Spark 似乎无法成功读取其内容。
我已经查看了Hadoop的newAPIHadoopFile
和newAPIHadoopRDD
,但到目前为止我还没有能够让任何东西工作。
此外,Spark支持为指定文件夹下的每个文件创建分区,如下例所示:
SparkConf SpkCnf = new SparkConf().setAppName("SparkApp")
.setMaster("local[4]");
JavaSparkContext Ctx = new JavaSparkContext(SpkCnf);
JavaRDD<String> FirstRDD = Ctx.textFile("C:\input\).cache();
其中C:\input\
指向包含多个文件的目录。
如果可以计算压缩文件,是否也可以将每个文件打包在一个压缩文件下,并遵循每个文件一个分区?
最佳答案
Spark默认支持压缩文件
All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").
这可以通过提供有关 Hadoop 支持哪些压缩格式的信息来扩展,基本上可以通过查找扩展 CompressionCodec
的所有类来检查 ( docs )
name | ext | codec class
-------------------------------------------------------------
bzip2 | .bz2 | org.apache.hadoop.io.compress.BZip2Codec
default | .deflate | org.apache.hadoop.io.compress.DefaultCodec
deflate | .deflate | org.apache.hadoop.io.compress.DeflateCodec
gzip | .gz | org.apache.hadoop.io.compress.GzipCodec
lz4 | .lz4 | org.apache.hadoop.io.compress.Lz4Codec
snappy | .snappy | org.apache.hadoop.io.compress.SnappyCodec
来源:List the available hadoop codecs
因此,只需调用以下格式即可实现上述格式和更多可能性:
sc.readFile(path)
在 Spark 中读取 zip 文件
不幸的是,默认情况下,zip
不在受支持的列表中。
我发现了一篇很棒的文章:Hadoop: Processing ZIP files in Map/Reduce以及一些答案 ( example ) 解释如何将导入的 ZipFileInputFormat 与 sc.newAPIHadoopFile API 一起使用。但这对我不起作用。
我的解决方案
没有任何外部依赖项,您可以使用 sc.binaryFiles
加载文件,然后解压缩 PortableDataStream
读取内容。这就是我选择的方法。
import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.ZipInputStream
import org.apache.spark.SparkContext
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {
def readFile(path: String,
minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {
if (path.endsWith(".zip")) {
sc.binaryFiles(path, minPartitions)
.flatMap { case (name: String, content: PortableDataStream) =>
val zis = new ZipInputStream(content.open)
// this solution works only for single file in the zip
val entry = zis.getNextEntry
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}
} else {
sc.textFile(path, minPartitions)
}
}
}
使用这个隐式类,您需要导入它并调用readFile
SparkContext
上的方法:
import com.github.atais.spark.Implicits.ZipSparkContext
sc.readFile(path)
隐式类将正确加载您的 zip
文件并像以前一样返回 RDD[String]
。
注意:这只适用于 zip 存档中的单个文件!
对于 zip 支持中的多个文件,请检查此答案:https://stackoverflow.com/a/45958458/1549135
关于compression - Apache Spark 中的 Zip 支持,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28969757/