apache-spark - 如何在减少之前避免大的中间结果?

标签 apache-spark mapreduce rdd

我在 spark 作业中遇到了一个让我感到惊讶的错误:

 Total size of serialized results of 102 tasks (1029.6 MB) is
 bigger than spark.driver.maxResultSize (1024.0 MB)

我的工作是这样的:

def add(a,b): return a+b
sums = rdd.mapPartitions(func).reduce(add)

rdd 有大约 500 个分区,func 获取该分区中的行并返回一个大数组(一个 1.3M double 组,或 ~10Mb)。 我想对所有这些结果求和并返回它们的总和。

Spark 似乎将 mapPartitions(func) 的总结果保存在内存中(大约 5GB),而不是增量处理它,后者只需要大约 30Mb。

除了增加 spark.driver.maxResultSize,有没有一种方法可以更增量地执行 reduce?


更新:实际上,我有点惊讶内存中保存了两个以上的结果。

最佳答案

当使用 reduce 时,Spark 对驱动程序应用最终归约。如果 func 返回单个对象,这实际上等同于:

reduce(add, rdd.collect())

您可以使用 treeReduce :

import math

# Keep maximum possible depth
rdd.treeReduce(add, depth=math.log2(rdd.getNumPartitions()))

toLocalIterator :

sum(rdd.toLocalIterator())

前者将以增加网络交换为代价递归地合并工作节点上的分区。您可以使用 depth 参数调整性能。

后者当时只会收集一个分区,但它可能需要重新评估 rdd 并且大部分工作将由驱动程序执行。

根据 func 中使用的确切逻辑,您还可以通过将矩阵拆分为 block 并按 block 执行加法来改进工作分配,例如使用 BlockMatrices

关于apache-spark - 如何在减少之前避免大的中间结果?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45333486/

相关文章:

scala - 在hadoop集群上安装Spark集群

hadoop - 在 mapreduce 中,为什么映射器不通过网络将输出键值直接发送到缩减器?

scala - RDD 的 foreachPartition 方法内的意外行为

scala - 从 RDD 中删除常量列并计算协方差矩阵

python - 如何使用 Spark 和 Caffe 对图像进行分类

python - Java 网关进程在向驱动程序发送其端口号之前退出

scala - 根据另一列将值映射到特定列

scala - 选择 DataFrame 中数组的最后一个元素

java - 为 cassandra 创建 ColumnFamilyInputFormat 的自定义 InputFormat

即使我将 numReducetasks 设置为 2,Hadoop 也只会生成一个输出文件