我正在尝试实现一个以前在 Spark 中运行良好的 Hadoop Map/Reduce 作业。 Spark 应用程序定义如下:
val data = spark.textFile(file, 2).cache()
val result = data
.map(//some pre-processing)
.map(docWeightPar => (docWeightPar(0),docWeightPar(1))))
.flatMap(line => MyFunctions.combine(line))
.reduceByKey( _ + _)
MyFunctions.combine 的位置
def combine(tuples: Array[(String, String)]): IndexedSeq[(String,Double)] =
for (i <- 0 to tuples.length - 2;
j <- 1 to tuples.length - 1
) yield (toKey(tuples(i)._1,tuples(j)._1),tuples(i)._2.toDouble * tuples(j)._2.toDouble)
如果用于输入的列表很大,则 combine
函数会生成大量映射键,这就是引发异常的地方。
在 Hadoop Map Reduce 设置中,我没有遇到问题,因为这是 combine
函数产生的点,也是 Hadoop 将映射对写入磁盘的点。 Spark 似乎将所有内容都保留在内存中,直到它因 java.lang.OutOfMemoryError: 超出 GC 开销限制
而爆炸。
我可能做了一些非常基本的错误,但我找不到任何关于如何解决此问题的指示,我想知道如何避免这种情况。由于我对 Scala 和 Spark 完全是菜鸟,所以我不确定问题是来自其中一个还是另一个,或者两者兼而有之。我目前正在尝试在自己的笔记本电脑上运行此程序,它适用于元组数组长度不是很长的输入。
最佳答案
启动 spark-shell
或 spark-submit
时添加以下 JVM 参数:
-Dspark.executor.memory=6g
您还可以考虑在创建 SparkContext
实例时显式设置工作线程数量:
分布式集群
在conf/slaves
中设置从属名称:
val sc = new SparkContext("master", "MyApp")
关于scala - 为什么 Spark 会失败并出现 java.lang.OutOfMemoryError : GC overhead limit exceeded?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27462061/