scala - Spark 内存限制超出问题

标签 scala hadoop apache-spark

我有一份在 spark 上运行的工作,它是使用 spark RDD 在 scala im 中编写的。由于昂贵的分组操作我得到这个错误: 容器因超出内存限制而被 YARN 终止。使用了 22.4 GB 的 22 GB 物理内存。考虑提升 spark.yarn.executor.memoryOverhead。 我增加了头顶的内存,但我得到了同样的结果。我使用 10 台 r4.xlarge 机器。我尝试使用 r4.2xlarge 甚至 r4.4xlarge,但也出现同样的错误。我正在测试的数据是 5GB 压缩数据(将近 50 个解压缩数据和近 600 万条记录)。

一些配置:

spark.executor.memory:20480M spark.driver.memory:21295M spark.yarn.executor.memoryOverhead:2g spark.executor.instances:10

代码如下所示:

val groupedEntitiesRDD = datasetRDD 
  .groupBy(_.entityId) 
  .map({ case (key, valueIterator) => key -> valueIterator.toList }) 
  .persist(StorageLevel.MEMORY_AND_DISK) 

val deduplicatedRDD = groupedEntitiesRDD 
  .flatMap({ case (_, entities) => deduplication(entities) }) 

def deduplication(entities: List[StreamObject[JsValue]]): List[StreamObject[JsValue]] = { 
  entities 
    .groupBy(_.deduplicationKey) 
    .values 
    .map(duplicates => duplicates.maxBy(_.processingTimestamp.toEpochSecond)) 
    .toList 
}

最佳答案

根据我的经验和我在 Spark 2.x 的发行说明中阅读的内容,需要分配比在 Spark 1.x。

你只分配了 2G 给 memoryOverhead 和 20GB 内存。我相信如果将其更改为 8G memoryOverhead 和 14GB executor 内存,您会得到更好的结果。

如果您仍然遇到内存问题(例如抛出实际的 OOM),您将需要查看数据偏差。尤其是groupBy操作会频繁导致严重的数据倾斜。

最后一件事,您写道您使用 RDD - 我希望您指的是 DataFramesDataSetsRDDsgroupBy 的性能非常低(参见 this 博客文章了解原因)所以如果你在 RDDs 你应该使用reduceByKey 代替。但本质上,您应该改用 DataFrames(或 DataSet),其中 groupBy 确实是正确的方法。

编辑!

您在评论中询问如何将 groupBy 转换为 reduceByKey。你可以这样做:

datasetRDD
  .map{case(entityID, streamObject) => (entityID, List(streamObject))}
  .reduceByKey(_++_)
  .flatMap{case(_, entities) => deduplication(entities)

您没有指定这些实体的数据结构,但看起来您正在寻找一些最大值并实际上丢弃了不需要的数据。这应该构建到 reduceByKey 操作中,这样您就可以在减少的同时过滤掉不必要的数据。

关于scala - Spark 内存限制超出问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45479813/

相关文章:

scala - 在 Spark 批处理作业中读取 Kafka 主题

scala - 如何为具有默认参数的案例类编写 Scala 提取器?

hadoop - 如何加载和存储nvarchar

java - 在Reducer中从HBase读取数据

Scala、cats - 如何使用 IO(或其他 monad)和 Either 创建无标记最终实现?

scala - 列表上的模式匹配以检查空

hadoop - 如何在 hadoop 实例类型之间找到正确的部分

json - 在读取 json 时解释 Spark 中的时间戳字段

python - 模块未找到错误 : No module named 'pyspark'

java - Spark MLlib 0.91 org.jblas.DoubleMatrix 错误