我在 spark 作业中遇到了一个让我感到惊讶的错误:
Total size of serialized results of 102 tasks (1029.6 MB) is
bigger than spark.driver.maxResultSize (1024.0 MB)
我的工作是这样的:
def add(a,b): return a+b
sums = rdd.mapPartitions(func).reduce(add)
rdd 有大约 500 个分区,func 获取该分区中的行并返回一个大数组(一个 1.3M double 组,或 ~10Mb)。 我想对所有这些结果求和并返回它们的总和。
Spark 似乎将 mapPartitions(func) 的总结果保存在内存中(大约 5GB),而不是增量处理它,后者只需要大约 30Mb。
除了增加 spark.driver.maxResultSize,有没有一种方法可以更增量地执行 reduce?
更新:实际上,我有点惊讶内存中保存了两个以上的结果。
最佳答案
当使用 reduce
时,Spark 对驱动程序应用最终归约。如果 func
返回单个对象,这实际上等同于:
reduce(add, rdd.collect())
您可以使用 treeReduce
:
import math
# Keep maximum possible depth
rdd.treeReduce(add, depth=math.log2(rdd.getNumPartitions()))
sum(rdd.toLocalIterator())
前者将以增加网络交换为代价递归地合并工作节点上的分区。您可以使用 depth
参数调整性能。
后者当时只会收集一个分区,但它可能需要重新评估 rdd
并且大部分工作将由驱动程序执行。
根据 func
中使用的确切逻辑,您还可以通过将矩阵拆分为 block 并按 block 执行加法来改进工作分配,例如使用 BlockMatrices
关于apache-spark - 如何在减少之前避免大的中间结果?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45333486/