我有一份在 spark 上运行的工作,它是使用 spark RDD 在 scala im 中编写的。由于昂贵的分组操作我得到这个错误:
容器因超出内存限制而被 YARN 终止。使用了 22.4 GB 的 22 GB 物理内存。考虑提升 spark.yarn.executor.memoryOverhead
。
我增加了头顶的内存,但我得到了同样的结果。我使用 10 台 r4.xlarge 机器。我尝试使用 r4.2xlarge 甚至 r4.4xlarge,但也出现同样的错误。我正在测试的数据是 5GB 压缩数据(将近 50 个解压缩数据和近 600 万条记录)。
一些配置:
spark.executor.memory
:20480M
spark.driver.memory
:21295M
spark.yarn.executor.memoryOverhead
:2g
spark.executor.instances
:10
代码如下所示:
val groupedEntitiesRDD = datasetRDD
.groupBy(_.entityId)
.map({ case (key, valueIterator) => key -> valueIterator.toList })
.persist(StorageLevel.MEMORY_AND_DISK)
val deduplicatedRDD = groupedEntitiesRDD
.flatMap({ case (_, entities) => deduplication(entities) })
def deduplication(entities: List[StreamObject[JsValue]]): List[StreamObject[JsValue]] = {
entities
.groupBy(_.deduplicationKey)
.values
.map(duplicates => duplicates.maxBy(_.processingTimestamp.toEpochSecond))
.toList
}
最佳答案
根据我的经验和我在 Spark 2.x 的发行说明中阅读的内容,需要分配比在 Spark 1.x。
你只分配了 2G 给 memoryOverhead 和 20GB 内存。我相信如果将其更改为 8G memoryOverhead 和 14GB executor 内存,您会得到更好的结果。
如果您仍然遇到内存问题(例如抛出实际的 OOM),您将需要查看数据偏差。尤其是groupBy
操作会频繁导致严重的数据倾斜。
最后一件事,您写道您使用 RDD
- 我希望您指的是 DataFrames
或 DataSets
? RDDs
与 groupBy
的性能非常低(参见 this 博客文章了解原因)所以如果你在 RDDs
你应该使用reduceByKey
代替。但本质上,您应该改用 DataFrames(或 DataSet),其中 groupBy
确实是正确的方法。
编辑!
您在评论中询问如何将 groupBy
转换为 reduceByKey
。你可以这样做:
datasetRDD
.map{case(entityID, streamObject) => (entityID, List(streamObject))}
.reduceByKey(_++_)
.flatMap{case(_, entities) => deduplication(entities)
您没有指定这些实体的数据结构,但看起来您正在寻找一些最大值并实际上丢弃了不需要的数据。这应该构建到 reduceByKey
操作中,这样您就可以在减少的同时过滤掉不必要的数据。
关于scala - Spark 内存限制超出问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45479813/