apache-spark - 为什么 Spark 会选择在单个节点上完成所有工作?

标签 apache-spark hadoop yarn

我在处理 Spark 作业时遇到困难,大约有一半的时间,它会选择处理单个节点上的所有数据,然后该节点内存不足并死亡。

问题:我怎样才能确保这种情况不会发生?

该系统在 Yarn 上使用 Spark 1.6.0,从 Hadoop 2.6 数据存储中提取,所有代码都是用 Java 编写的。我在具有十几个节点(亚马逊)的集群中动态分配资源。

DAG 比较简单:

RDD --> mapToPair \  
                   coGroup --> flatMapToPair --> reduceByKey --> save
RDD --> mapToPair /

当它正确运行时,所有任务都会在集群中很好地分布,整个工作大约需要 20 分钟。我们将其称为“良好行为”。然而,有时 flatMapToPair 阶段有效地在单个执行程序中运行。我们将称之为“不良行为”

当我为“不良行为”作业加载 Spark UI 并深入到 flatMapToPair 阶段时,我看到实际上每个节点上运行大约 3-4 个执行程序(与“良好行为”情况相同) .但是,除了一个之外的所有执行程序都在几分之一秒内完成,其余的执行程序运行了 10 分钟,然后因超出内存限制而被 yarn 杀死。

我已经尝试过的东西:
  • 网络。搜索诸如“spark 在一个节点上运行”之类的内容和变体几乎普遍会导致人们在 spark shell 中以本地模式运行或出现类似的配置问题。鉴于我至少在某些时候表现良好,这些类型的配置问题似乎不太可能(并且我已经检查过我不是意外地处于本地模式,我有大约 100 个分区,......)。
  • 在同一集群上运行的其他 Spark 作业表现良好。这似乎排除了一些集群范围的错误配置(见鬼,有时甚至这个工作运行良好)。
  • 集群利用率似乎不会影响我得到好的行为还是坏的行为。当集群被大量使用时和集群根本没有其他任何东西运行时,我都看到了这两种行为。
  • 这似乎不是一个 yarn 问题,因为执行器都在集群中分布良好。当然,我可能对此有误,但问题似乎确实是执行者之间的工作分配。
  • 数据集中有多个键。我在 coGroup 和 flatMapToPair 之间插入了一个 countByKey 并打印了结果(对于 20 个左右人口最多的键)。数据在这些顶级键中分布相当均匀。

  • 我在回复评论时尝试过的事情
  • 在 flatMapToPair 调用之前重新分区 RDD 以强制 500 个分区。这只会将不良行为转移到重新分区阶段。
  • 增加默认并行度。我确实通过这种方式获得了更多分区,但不良行为仍然存在于 flatMapToPair 阶段。
  • 剥离数据(实际上我在发布之前做了很多,但未能将其包含在原始列表中)。我们只有几十个 GB,而且我已经加载了我需要的最低限度的数据。

  • 这是一个“有趣”的小黑森错误,添加调试日志后不良行为消失,然后在删除日志后保持消失,一段时间后又重新出现。我没有想法,所以如果有人甚至有一些推荐的诊断步骤,我都会倾听。

    最佳答案

    我遇到了一些非常相似的问题,虽然我对解决方案并不完全满意,因为我无法完全解释它为什么有效,但它似乎确实有效。就我而言,它是在 shuffle 之后,并且 shuffle 数据的大小相当小。问题在于,随后的计算显着增加了数据的大小,以至于在 1 或 2 个执行器上进行这些计算成为瓶颈。我最好的猜测是它与涉及数据源的首选位置和目标分区大小的启发式有关,可能与不知道在后期阶段发生的扩展相结合。

    通过添加 coalesce(totalCores),我能够获得一致的、分布良好的洗牌。 ,其中 totalCores定义为 spark.executor.instances x spark.executor.cores .它似乎也适用于 totalCores 的更大倍数。 ,但就我而言,我不需要更多的并行性。请注意,可能需要使用 repartition而不是 coalesce取决于用例。另外,这是在 spark 2.2.1 上,供引用。

    关于apache-spark - 为什么 Spark 会选择在单个节点上完成所有工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54409371/

    相关文章:

    scala - 找到 : org. apache.spark.sql.Dataset[(Double, Double)] 需要 : org. apache.spark.rdd.RDD[(Double, Double)]

    java - Hadoop 字数统计 MapReduce 教程 已弃用

    date - 如何在Hive中将十六进制日期值转换为日期值

    hadoop - 在 yarn 上运行 Spark 机学习示例失败

    python - 从文本pyspark中提取字符串

    apache-spark - 为在同一台机器上运行的多个执行程序导出 spark 执行程序 jmx 指标

    mysql - 在Java应用程序中使用JDBC并行读取大数据的标准算法或模式

    hadoop - 客户端应用程序的Hadoop conf

    hadoop - yarn 应用历史记录存储在哪里?

    apache-spark - 如何在Apache Spark中链接多个作业