scala - Spark/Scala 打开压缩的 CSV 文件

标签 scala apache-spark

我是 Spark 和 Scala 新手。我们有 CSV 格式的广告事件日志文件,然后使用 pkzip 进行压缩。我见过很多关于如何使用 Java 解压缩压缩文件的示例,但是如何使用 Scala for Spark 来解压压缩文件呢?最终,我们希望从每个传入文件中获取、提取数据并将其加载到 Hbase 目标表中。也许这可以通过 HadoopRDD 来完成?之后,我们将引入 Spark Streaming 来监视这些文件。

谢谢, 本

最佳答案

默认压缩支持

@samthebest如果您使用 Spark (Hadoop) 中默认提供的压缩格式,则答案是正确的。分别是:

  • bzip2
  • gzip
  • lz4
  • 活泼

我在其他答案中更深入地解释了这个主题:https://stackoverflow.com/a/45958182/1549135

正在阅读 zip

但是,如果您尝试读取 zip 文件,则需要创建自定义解决方案。我已经提供的答案中提到了一个。

如果您需要从存档中读取多个文件,您可能会对我提供的答案感兴趣:https://stackoverflow.com/a/45958458/1549135

基本上,一直使用sc.binaryFiles,然后解压缩PortableDataStream,如示例中所示:

sc.binaryFiles(path, minPartitions)
  .flatMap { case (name: String, content: PortableDataStream) =>
    val zis = new ZipInputStream(content.open)
    Stream.continually(zis.getNextEntry)
          .takeWhile(_ != null)
          .flatMap { _ =>
              val br = new BufferedReader(new InputStreamReader(zis))
              Stream.continually(br.readLine()).takeWhile(_ != null)
          }

关于scala - Spark/Scala 打开压缩的 CSV 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21866489/

相关文章:

java - Scala如何将 "None"排序到底部(如果存在)并选择每组中的第一行?

scala - `scala.Any` 的声明在哪里?

scala - 工厂对象和案例类

scala - 在 spark 中对多个 DataFrame 执行连接

带通配符的 Scala 过滤器?

scala - 从数据框中获取按唯一 ID 分组的前 n 条记录

scala - SparkSession.Builder 失败,错误代码 "A master URL must be set in your configuration": "spark.master" is set to "local"

scala - Spark Map 列中最大值对应的键

scala - 使用 Akka 测试套件从 ActorRef 获取底层 Actor

apache-spark - 从谷歌存储 gs ://filesystem from local spark instance 读取