我在处理 Spark 作业时遇到困难,大约有一半的时间,它会选择处理单个节点上的所有数据,然后该节点内存不足并死亡。
问题:我怎样才能确保这种情况不会发生?
该系统在 Yarn 上使用 Spark 1.6.0,从 Hadoop 2.6 数据存储中提取,所有代码都是用 Java 编写的。我在具有十几个节点(亚马逊)的集群中动态分配资源。
DAG 比较简单:
RDD --> mapToPair \
coGroup --> flatMapToPair --> reduceByKey --> save
RDD --> mapToPair /
当它正确运行时,所有任务都会在集群中很好地分布,整个工作大约需要 20 分钟。我们将其称为“良好行为”。然而,有时 flatMapToPair 阶段有效地在单个执行程序中运行。我们将称之为“不良行为”
当我为“不良行为”作业加载 Spark UI 并深入到 flatMapToPair 阶段时,我看到实际上每个节点上运行大约 3-4 个执行程序(与“良好行为”情况相同) .但是,除了一个之外的所有执行程序都在几分之一秒内完成,其余的执行程序运行了 10 分钟,然后因超出内存限制而被 yarn 杀死。
我已经尝试过的东西:
我在回复评论时尝试过的事情
这是一个“有趣”的小黑森错误,添加调试日志后不良行为消失,然后在删除日志后保持消失,一段时间后又重新出现。我没有想法,所以如果有人甚至有一些推荐的诊断步骤,我都会倾听。
最佳答案
我遇到了一些非常相似的问题,虽然我对解决方案并不完全满意,因为我无法完全解释它为什么有效,但它似乎确实有效。就我而言,它是在 shuffle 之后,并且 shuffle 数据的大小相当小。问题在于,随后的计算显着增加了数据的大小,以至于在 1 或 2 个执行器上进行这些计算成为瓶颈。我最好的猜测是它与涉及数据源的首选位置和目标分区大小的启发式有关,可能与不知道在后期阶段发生的扩展相结合。
通过添加 coalesce(totalCores)
,我能够获得一致的、分布良好的洗牌。 ,其中 totalCores
定义为 spark.executor.instances
x spark.executor.cores
.它似乎也适用于 totalCores
的更大倍数。 ,但就我而言,我不需要更多的并行性。请注意,可能需要使用 repartition
而不是 coalesce
取决于用例。另外,这是在 spark 2.2.1 上,供引用。
关于apache-spark - 为什么 Spark 会选择在单个节点上完成所有工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54409371/