我正在运行 Spark 作业来聚合数据。我有一个名为 Profile 的自定义数据结构,它基本上包含一个 mutable.HashMap[Zone, Double]
.我想使用以下代码合并共享给定 key (UUID)的所有配置文件:
def merge = (up1: Profile, up2: Profile) => { up1.addWeights(up2); up1}
val aggregated = dailyProfiles
.aggregateByKey(new Profile(), 3200)(merge, merge).cache()
奇怪的是,Spark 失败并出现以下错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 116318 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
显而易见的解决方案是增加“spark.driver.maxResultSize”,但有两件事让我感到困惑。
take()
或 collect()
),但我没有给驱动程序带来任何东西,只是从 HDFS 读取,聚合,保存回 HDFS。 有谁知道我为什么会收到这个错误?
最佳答案
Yes, It's failing because The values we see in exception message are rounded off by one precision and comparison happening in bytes.
That serialized output must be more than 1024.0 MB and less than 1024.1 MB.
检查添加的 Apache Spark 代码片段,这个错误非常有趣并且非常罕见。 :)
这里
totalResultSize > maxResultSize
两者都是 Long 类型并且 in 以字节为单位保存值。但是msg
保存来自 Utils.bytesToString()
的舍入值.//TaskSetManager.scala
def canFetchMoreResults(size: Long): Boolean = sched.synchronized {
totalResultSize += size
calculatedTasks += 1
if (maxResultSize > 0 && totalResultSize > maxResultSize) {
val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +
s"(${Utils.bytesToString(totalResultSize)}) is bigger than spark.driver.maxResultSize " +
s"(${Utils.bytesToString(maxResultSize)})"
logError(msg)
abort(msg)
false
} else {
true
}
}
Apache Spark 1.3 - source//Utils.scala
def bytesToString(size: Long): String = {
val TB = 1L << 40
val GB = 1L << 30
val MB = 1L << 20
val KB = 1L << 10
val (value, unit) = {
if (size >= 2*TB) {
(size.asInstanceOf[Double] / TB, "TB")
} else if (size >= 2*GB) {
(size.asInstanceOf[Double] / GB, "GB")
} else if (size >= 2*MB) {
(size.asInstanceOf[Double] / MB, "MB")
} else if (size >= 2*KB) {
(size.asInstanceOf[Double] / KB, "KB")
} else {
(size.asInstanceOf[Double], "B")
}
}
"%.1f %s".formatLocal(Locale.US, value, unit)
}
Apache Spark 1.3 - source
关于scala - 'spark.driver.maxResultSize' 的范围,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32530239/