java - Spark 作业在 takeSample 上用完堆内存

标签 java scala apache-spark cloud

我有一个 Apache spark 集群,其中包含一个主节点和三个工作节点。每个工作节点有 32 个内核和 124G 内存。我在 HDFS 中还有一个数据集,其中包含大约 6.5 亿条文本记录。这个数据集是一些像这样读入的序列化 RDD:

import org.apache.spark.mllib.linalg.{Vector, Vectors, SparseVector}
val vectors = sc.objectFile[(String, SparseVector)]("hdfs://mn:8020/data/*")

我想从这些记录中提取一百万个样本来做一些分析,所以我想我会尝试 val sample = vectors.takeSample(false, 10000, 0)。但是,最终失败并显示此错误消息:

 15/08/25 09:48:27 ERROR Utils: Uncaught exception in thread task-result-getter-3
java.lang.OutOfMemoryError: Java heap space
        at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:64)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
        at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:61)
        at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:79)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Exception in thread "task-result-getter-3" java.lang.OutOfMemoryError: Java heap space
        at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:64)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
        at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:61)
        at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$r

我知道我的堆空间快用完了(我想是在驱动程序上?),这是有道理的。执行 hadoop fs -du -s/path/to/data,数据集在磁盘上占用 2575 GB(但大小仅为 ~850 GB)。

所以,我的问题是,我该如何提取这个包含 1000000 条记录的样本(我稍后计划将其序列化到磁盘)?我知道我可以用较小的样本量做 takeSample() 并在以后聚合它们,但我认为我只是没有设置正确的配置或做错了什么,这阻止了我这样做我喜欢的方式。

最佳答案

在处理大数据时,在驱动程序节点收集中间结果很少是个好主意。相反,将数据分布在集群中几乎总是更好。这同样适用于您要采集的样本。

如果您想对数据集的 1000000 个元素进行采样,然后将其写入磁盘,那么为什么不在驱动程序处收集样本并将其写入磁盘呢?下面的代码片段应该可以做到这一点

val sample = vectors.zipWithIndex().filter(_._1 < 1000000).map(_._2)

sample.saveAsObjectFile("path to file")

关于java - Spark 作业在 takeSample 上用完堆内存,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32206785/

相关文章:

java - 反射获取特定类中带注释的所有方法

Scala Cast 对象到另一种类型

java - 使用java Spark将数据集保存到cassandra

java - com.mongodb.MongoTimeoutException : Timed out after 10000 ms while waiting to connect

java - 如何添加正则表达式来匹配字母数字字符和一些特殊字符

java - 执行时按下的按钮外观

r - sparklyr:如何跨组获取平衡样本

java - 在嵌套 Map 中使用 groupingBy,但收集到不同类型的对象

scala - flatMap 功能签名(输入-> 输出)是否建议进行任何展平?

scala - 在不同列的 Spark 中读取 csv 文件