我有如下代码:
// make a rd according to an id
def makeRDD(id:Int, data:RDD[(VertexId, Double)]):RDD[(Long, Double)] = { ... }
val data:RDD[(VertexId, Double)] = ... // loading from hdfs
val idList = (1 to 100)
val rst1 = idList.map(id => makeRDD(id, data)).reduce(_ union _).reduceByKey(_+_)
val rst2 = idList.map(id => makeRDD(id, data)).reduce((l,r) => (l union r).reduceByKey(_+_))
rst1和rst2得到样本结果。我认为 rst1 需要更多内存(100倍),但只有一个reduceByKey转换;然而,rst2 需要更少的内存,但需要更多的reduceByKey 转换(99 次)。那么,这是一个时间和空间权衡的游戏吗?
我的问题是:我上面的分析是否正确,或者Spark内部是否以相同的方式翻译 Action ?
P.S.: 首先union所有子rdd,然后reduceByKey,其中reduceByKey是外部reduce。 rst2的reduceByKey一一进行,其中reduceByKey是里面reduce。
最佳答案
长话短说,这两种解决方案的效率都相对较低,但第二个比第一个更糟糕。
让我们首先回答最后一个问题。对于低级 RDD API,只有两种类型的全局自动优化(相反):
- 使用显式或隐式缓存的任务结果而不是重新计算完整的谱系
- 将不需要随机播放的多个转换组合到单个
ShuffleMapStage
其他一切几乎都是定义 DAG 的顺序转换。这与限制性更强的高级 Dataset
(DataFrame
) API 形成鲜明对比,后者对转换做出特定假设并执行执行计划的全局优化。
关于你的代码。第一个解决方案的最大问题是当您应用迭代union
时,沿袭不断增长。它使得某些事情(例如故障恢复)变得昂贵,并且由于 RDD 是递归定义的,因此可能会因 StackOverflow 异常而失败。一个不太严重的副作用是分区数量不断增加,这似乎并没有在后续的减少中得到补偿*。您可以在我对Stackoverflow due to long RDD Lineage的回答中找到更详细的解释。但这里你真正需要的是一个像这样的union
:
sc.union(idList.map(id => makeRDD(id, data))).reduceByKey(_+_)
假设您应用真正的归约函数,这实际上是一个最佳解决方案。
第二种解决方案显然也遇到了同样的问题,但情况变得更糟。虽然第一种方法只需要两个阶段和一次洗牌,但这需要对每个 RDD 进行一次洗牌。由于分区数量不断增加,并且您使用默认的 HashPartitioner
,因此每条数据都必须多次写入磁盘,并且很可能通过网络多次洗牌。忽略低级计算,每条记录都会被打乱 O(N) 次,其中 N 是您合并的 RDD 数量。
关于内存使用情况,如果不了解更多有关数据分布的信息,这一点并不明显,但在最坏的情况下,第二种方法可能会表现出明显更差的行为。
如果 +
使用恒定空间,则减少的唯一要求是使用 hashmap 来存储映射端组合的结果。由于分区作为数据流进行处理,而不将完整内容读取到内存中,这意味着每个任务的总内存大小将与唯一键的数量成正比,而不是与数据量成正比。由于第二种方法需要更多任务,因此总体内存使用量将高于第一种情况。平均而言,由于数据被部分组织,它可能会稍微好一些,但不太可能补偿额外的成本。
* 如果您想了解它如何影响整体性能,您可以查看 Spark iteration time increasing exponentially when using join这是略有不同的问题,但应该让您了解为什么控制分区数量很重要。
关于scala - Spark RDD : multiple reducebykey or just once,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37693543/