scala - 'spark.driver.maxResultSize' 的范围

标签 scala apache-spark apache-spark-1.3

我正在运行 Spark 作业来聚合数据。我有一个名为 Profile 的自定义数据结构,它基本上包含一个 mutable.HashMap[Zone, Double] .我想使用以下代码合并共享给定 key (UUID)的所有配置文件:

def merge = (up1: Profile, up2: Profile) => { up1.addWeights(up2); up1}
val aggregated = dailyProfiles
  .aggregateByKey(new Profile(), 3200)(merge, merge).cache()

奇怪的是,Spark 失败并出现以下错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 116318 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)



显而易见的解决方案是增加“spark.driver.maxResultSize”,但有两件事让我感到困惑。
  • 太巧了,我得到的 1024.0 大于 1024.0
  • 我在谷歌上搜索此特定错误和配置参数时发现的所有文档和帮助都表明它会影响将值返回给驱动程序的函数。 (比如 take()collect() ),但我没有给驱动程序带来任何东西,只是从 HDFS 读取,聚合,保存回 HDFS。

  • 有谁知道我为什么会收到这个错误?

    最佳答案

    Yes, It's failing because The values we see in exception message are rounded off by one precision and comparison happening in bytes.

    That serialized output must be more than 1024.0 MB and less than 1024.1 MB.


    检查添加的 Apache Spark 代码片段,这个错误非常有趣并且非常罕见。 :)
    这里totalResultSize > maxResultSize两者都是 Long 类型并且 in 以字节为单位保存值。但是msg保存来自 Utils.bytesToString() 的舍入值.
    //TaskSetManager.scala
      def canFetchMoreResults(size: Long): Boolean = sched.synchronized {
        totalResultSize += size
        calculatedTasks += 1
        if (maxResultSize > 0 && totalResultSize > maxResultSize) {
          val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +
            s"(${Utils.bytesToString(totalResultSize)}) is bigger than spark.driver.maxResultSize " +
            s"(${Utils.bytesToString(maxResultSize)})"
          logError(msg)
          abort(msg)
          false
        } else {
          true
        }
      }
    
    Apache Spark 1.3 - source
    //Utils.scala
      def bytesToString(size: Long): String = {
        val TB = 1L << 40
        val GB = 1L << 30
        val MB = 1L << 20
        val KB = 1L << 10
    
        val (value, unit) = {
          if (size >= 2*TB) {
            (size.asInstanceOf[Double] / TB, "TB")
          } else if (size >= 2*GB) {
            (size.asInstanceOf[Double] / GB, "GB")
          } else if (size >= 2*MB) {
            (size.asInstanceOf[Double] / MB, "MB")
          } else if (size >= 2*KB) {
            (size.asInstanceOf[Double] / KB, "KB")
          } else {
            (size.asInstanceOf[Double], "B")
          }
        }
        "%.1f %s".formatLocal(Locale.US, value, unit)
      }
    
    Apache Spark 1.3 - source

    关于scala - 'spark.driver.maxResultSize' 的范围,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32530239/

    相关文章:

    apache-spark - 如何制作(Spark1.6)saveAsTextFile 来附加现有文件?

    python - Pyspark 数据框 : Summing over a column while grouping over another

    hadoop - Spark 需要 2 秒才能数到 10 ...?

    apache-spark - 如何启用从 Cassandra 到 Spark 的流式传输?

    json - Play Framework : How to replace all the occurrence of a value in a JSON tree

    scala - Sbt:何时使用 testQuick 以及它如何确定要跳过哪些测试?

    java - 如何让 sbt 在发布到 Maven 存储库时用实际日期替换 SNAPSHOT?

    hadoop - 什么是 "Hadoop"- Hadoop 的定义?

    java - RDD.saveAsTextFile后的空文件是什么?

    generics - 将枚举转换为迭代器