几天来,我一直在尝试让 spark 作业运行完成,我终于能够完成它,但仍然有大量失败的任务,其中执行者被杀死,并显示以下消息:
ExecutorLostFailure (executor 77 exited caused as one of the running tasks) 原因:容器因超出内存限制而被 YARN 杀死。使用了 45.1 GB 的 44.9 GB 物理内存。考虑提升 spark.yarn.executor.memoryOverhead
这些是我传递给集群的属性:
[
{
"classification": "spark-defaults",
"properties": {
"spark.executor.memory": "41000m",
"spark.driver.memory": "8000m",
"spark.executor.cores": "6",
"spark.shuffle.service.enabled": "true",
"spark.executor.instances": "98",
"spark.yarn.executor.memoryOverhead": "5000"
}
}
]
该集群由 20 台机器组成,每台机器具有 32 个内核和 240G 内存。我应该继续提高 memoryOverhead 还是有一点表明更深层次的问题。这次的错误似乎发生在将结果数据写入 S3 之前从 5000 个分区合并到 500 个分区的过程中。我猜合并导致了洗牌,并且由于集群的内存已经很低,所以它把它推得太远了。
工作流程如下:
- 将 parquet 文件从 s3 加载到数据框中
- 提取一组唯一键,这些键使用针对数据框的 sql 查询对数据进行分组
- 将数据帧转换为 JavaRDD 并应用多个映射函数
- MapToPair 数据
使用下面的 combineByKey,本质上是通过键将单个对象合并到对象数组中
combineByKey(新函数, 添加函数, 合并函数, new HashPartitioner(5000), false, null);
更多 map
- 对于几个唯一键中的每一个,过滤 rdd 以仅获取具有该键的元组,然后在合并后将这些子集中的每一个保存到磁盘
另一个问题是上面的44.9这个数字是怎么推导出来的。我认为最大内存将是执行程序内存 + memoryOverhead,即 46G 而不是 44.9G。
任何帮助将不胜感激, 弥敦道
最佳答案
根据我的经验,这表明存在更深层次的问题,并且从您发布的内容中我看到了一些陷阱。
首先,您可能想查看分区大小,因为 OOM 很容易由 combineByKey
操作期间创建的数据倾斜引起。可能有些键很频繁?
如果没有,我会查看 coalesce
函数调用。您还没有发布代码,所以我只能猜测正在生成的 DAG,但我会知道 coalesce
函数和在同一写入阶段执行的其他操作。
Spark 分阶段执行,从您的解释中我可以看出,您在 write
之前调用了 coalesce
,因此取决于您进入这个最终阶段的分区数量阶段并根据在此阶段完成的转换,您实际上可能在比所需分区更少的分区上操作,从而导致 OOM 异常。
用文字解释有点复杂,但我会尝试举一个简单的例子来说明可能发生的事情。
想象一下这样一个简单的场景,您在文件中读取 (Int, Double)
的键值对,然后将一些函数应用于所有值,例如 round
。然后您希望将输出写回单个文件,因此您调用 coalesce(1)
,然后调用 write
。代码看起来像这样:
val df = sqlContext.read.parquet("/path/to/my/file/")
df.map{case(key: Int, value: Double) => (key, round(value)}
.toDF()
.coalesce(1)
.write
.parquet("/my/output/path/")
现在有人可能会认为 map
操作是在您的整个集群上并行执行的,但是如果您注意 spark ui,您会注意到该任务并未分布在您的整个集群中。由于 coalesce(1)
,Spark 知道一切都需要在一个分区中结束,因此它只是开始将所有数据收集到一个分区中,应用 map
函数作为它继续。正如您可能想象的那样,通过更复杂的转换,这很容易导致 OOM 异常。
我希望这能为您提供一些有关查找位置的指示。祝你好运:)
关于memory-management - Spark ExecutorLostFailure 内存超出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37615958/