java - 使用 SPARK 从 zip 到 seq

标签 java scala apache-spark hdfs out-of-memory

我每天都会收到一个 zip 存档“2018-06-26.zip”,大小约为。 250 Mb 压缩后,包含 165-170.000 个小 XML 文件 (Kb)。我将 zip 存档加载到 HDFS(避免小文件问题),并使用 SPARK 从 zip 中提取它们(zip 不可拆分),制作配对 RDD,以文件名作为键,以内容作为值并保存它们通过配对的 RDD 作为序列文件。一切都运行顺利,一个小的 zip 存档仅包含 3 个用于测试目的的 XML 文件,但是当我向它提供大存档时,我得到了

   java.lang.OutOfMemoryError: GC overhead limit exceeded
   at java.util.Arrays.copyOf(Arrays.java:2367)
   at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
   ...
   ...

我在 Cloudera Quickstart VM 上运行:CDH 5.13.3(HDFS:2.60、JDK:1.7.0.67、SPARK:1.6.0、Scala 2.10)

我还没有在成熟的集群上运行它,因为我想在部署它之前确保我的代码是正确的......

垃圾收集器在超出开销限制的情况下继续运行 OOM。我知道要增加驱动程序和 Java 堆空间的内存量,但我怀疑我的方法占用了太多内存......监视内存使用情况,但没有发现任何内存泄漏......

代码如下:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import java.util.zip.{ZipEntry, ZipInputStream}
import org.apache.spark.input.PortableDataStream
import scala.collection.mutable
val conf = new SparkConf().setMaster("local").setAppName("ZipToSeq")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var xml_map = new mutable.HashMap[String, String]()
sc.binaryFiles("/user/cloudera/test/2018-06-26.zip", 10).collect
   .foreach { zip_file : (String, PortableDataStream) =>
    val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open)
    var zip_entry : ZipEntry = null
    while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) {
      if (!zip_entry.isDirectory) {
        val key_file_name = zip_entry.getName
        val value_file_content = scala.io.Source.fromInputStream(zip_stream, "iso-8859-1").getLines.mkString("\n")
        xml_map += ( key_file_name -> value_file_content )
      }
      zip_stream.closeEntry()
    }
    zip_stream.close()
  }
val xml_rdd = sc.parallelize(xml_map.toSeq).saveAsSequenceFile("/user/cloudera/2018_06_26")

非常感谢任何帮助或想法。

最佳答案

我的最终解决方案:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import java.util.zip.{ZipEntry, ZipInputStream}
import org.apache.spark.input.PortableDataStream
import scala.collection.mutable
val conf = new SparkConf().setMaster("local").setAppName("ZipToSeq")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var xml_map = new mutable.HashMap[String, String]()
sc.binaryFiles("/user/cloudera/test/2018-06-26.zip").collect
   .foreach { zip_file : (String, PortableDataStream) =>
   val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open)
   var zip_entry : ZipEntry = null
   while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) {
      if (!zip_entry.isDirectory) {
      val key_file_name = zip_entry.getName
      val value_file_content = scala.io.Source.fromInputStream(zip_stream, "iso-8859-1").getLines.mkString("\n")
      xml_map += ( key_file_name -> value_file_content )
   }
   zip_stream.closeEntry()
  }
  zip_stream.close()
}
val xml_rdd = sc.parallelize(xml_map.toSeq, 75).saveAsSequenceFile("/user/cloudera/2018_06_26")

原始 zip 文件 325 Mb,包含 170.000 个 XML 文件 结果是 75 个分区,每个分区大约35 MB。总计约 2.5 GB 在我的 Windows PC 上本地运行时间:1.2 分钟:-)

关于java - 使用 SPARK 从 zip 到 seq,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52254617/

相关文章:

单个面板的 Java Swing GlassPane 功能?

java - java中没有AtomicInteger如何保证原子性?

scala - 在 argonaut 中转换 JSON 字段名称

apache-spark - pyspark : NameError: name 'spark' is not defined

java - 为什么我的小程序无法在没有 JDK 的计算机上运行?

java - e : java. io.IOException : Cannot run program "java -jar bg.jar": CreateProcess error=2, 系统找不到该文件

java - 懒惰斐波那契数列

Scala 2.10 "No implicit view available"请求 View 的类型参数错误

apache-spark - 用 pyspark 用以前已知的好值填充 null

java - 为什么 apache spark 不适用于 java 10?我们得到非法反射然后 java.lang.IllegalArgumentException