apache-spark - Spark 组由于内存不足问题

标签 apache-spark

我正在一个相当小的数据集(HDFS 中的 80 个文件,总共很少)上执行一个简单的 groupBy。我在 yarn 集群中的 8 台低内存机器上运行 Spark,即:

spark-submit ... --master yarn 客户端 --num-executors 8 --executor-内存 3000m --executor-cores 1

数据集由长度为 500-2000 的字符串组成。

我正在尝试执行一个简单的groupByKey(见下文),但它失败并出现java.lang.OutOfMemoryError:GC开销限制超出异常

val keyvals = sc.newAPIHadoopFile("hdfs://...")
  .map( someobj.produceKeyValTuple )
keyvals.groupByKey().count()

我可以使用 reduceByKey 毫无问题地计算组大小,确保问题不是由单个过大的组引起的,也不是由过多的组引起的:

keyvals.map(s => (s._1, 1)).reduceByKey((a,b) => a+b).collect().foreach(println)
// produces:
//  (key1,139368)
//  (key2,35335)
//  (key3,392744)
//  ...
//  (key13,197941)

我尝试过重新格式化、重新排列和增加 groupBy 并行度:

keyvals.groupByKey(24).count // fails
keyvals.groupByKey(3000).count // fails
keyvals.coalesce(24, true).groupByKey(24).count // fails
keyvals.coalesce(3000, true).groupByKey(3000).count // fails
keyvals.coalesce(24, false).groupByKey(24).count // fails
keyvals.coalesce(3000, false).groupByKey(3000).count // fails

我尝试使用 spark.default.parallelism,并将 spark.shuffle.memoryFraction 增加到 0.8,同时降低 >spark.storage.memoryFraction0.1

失败的阶段(计数)将在 3000 个任务中的第 2999 个任务上失败。

我似乎找不到任何表明 groupBy 不应该只溢出到磁盘而不是将内容保留在内存中的内容,但我就是无法让它正常工作,即使在相当小的数据集上也是如此。显然情况并非如此,我一定做错了什么,但我不知道从哪里开始调试!

最佳答案

Patrick Wendell 揭示了 groupBy 运算符的一些细节 on the mailing list 。要点如下:

Within a partition things will spill [...] This spilling can only occur across keys at the moment. Spilling cannot occur within a key at present. [...] Spilling within one key for GroupBy's is likely to end up in the next release of Spark, Spark 1.2. [...] If the goal is literally to just write out to disk all the values associated with each group, and the values associated with a single group are larger than fit in memory, this cannot be accomplished right now with the groupBy operator.

他进一步提出了一种解决方法:

The best way to work around this depends a bit on what you are trying to do with the data down stream. Typically approaches involve sub-dividing any very large groups, for instance, appending a hashed value in a small range (1-10) to large keys. Then your downstream code has to deal with aggregating partial values for each group. If your goal is just to lay each group out sequentially on disk on one big file, you can call sortByKey with a hashed suffix as well. The sort functions are externalized in Spark 1.1 (which is in pre-release).

关于apache-spark - Spark 组由于内存不足问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25136064/

相关文章:

algorithm - 根据数据完成一个基于RDD的RDD

apache-spark - 如何调试传递给mapPartitions的函数

python - Kmeans 与 Spark

python - 通过 Spark 1.6 Dataframe 上的其他字段计算每个组的不同元素

apache-spark - Websphere MQ 作为 Apache Spark Streaming 的数据源

scala - 连接两个没有公共(public)列的数据框

apache-spark - 文件已经存在pyspark

scala - 将 rdd 转换为 pairRDD

apache-spark - 如果Spark支持内存溢出到磁盘,Spark Out of Memory怎么会发生?

java - 结构化流kafka Spark java.lang.NoClassDefFoundError : org/apache/spark/internal/Logging