scala - Spark RDD : multiple reducebykey or just once

标签 scala performance apache-spark rdd

我有如下代码:

// make a rd according to an id
def makeRDD(id:Int, data:RDD[(VertexId, Double)]):RDD[(Long, Double)] = { ... }  
val data:RDD[(VertexId, Double)] = ... // loading from hdfs
val idList = (1 to 100)
val rst1 = idList.map(id => makeRDD(id, data)).reduce(_ union _).reduceByKey(_+_)
val rst2 = idList.map(id => makeRDD(id, data)).reduce((l,r) => (l union r).reduceByKey(_+_))

rst1和rst2得到样本结果。我认为 rst1 需要更多内存(100倍),但只有一个reduceByKey转换;然而,rst2 需要更少的内存,但需要更多的reduceByKey 转换(99 次)。那么,这是一个时间和空间权衡的游戏吗?

我的问题是:我上面的分析是否正确,或者Spark内部是否以相同的方式翻译 Action ?

P.S.: 首先union所有子rdd,然后reduceByKey,其中reduceByKey是外部reduce。 rst2的reduceByKey一一进行,其中reduceByKey是里面reduce。

最佳答案

长话短说,这两种解决方案的效率都相对较低,但第二个比第一个更糟糕。

让我们首先回答最后一个问题。对于低级 RDD API,只有两种类型的全局自动优化(相反):

  • 使用显式或隐式缓存的任务结果而不是重新计算完整的谱系
  • 将不需要随机播放的多个转换组合到单个 ShuffleMapStage

其他一切几乎都是定义 DAG 的顺序转换。这与限制性更强的高级 Dataset (DataFrame) API 形成鲜明对比,后者对转换做出特定假设并执行执行计划的全局优化。

关于你的代码。第一个解决方案的最大问题是当您应用迭代union 时,沿袭不断增长。它使得某些事情(例如故障恢复)变得昂贵,并且由于 RDD 是递归定义的,因此可能会因 StackOverflow 异常而失败。一个不太严重的副作用是分区数量不断增加,这似乎并没有在后续的减少中得到补偿*。您可以在我对Stackoverflow due to long RDD Lineage的回答中找到更详细的解释。但这里你真正需要的是一个像这样的union:

sc.union(idList.map(id => makeRDD(id, data))).reduceByKey(_+_)

假设您应用真正的归约函数,这实际上是一个最佳解决方案。

第二种解决方案显然也遇到了同样的问题,但情况变得更糟。虽然第一种方法只需要两个阶段和一次洗牌,但这需要对每个 RDD 进行一次洗牌。由于分区数量不断增加,并且您使用默认的 HashPartitioner,因此每条数据都必须多次写入磁盘,并且很可能通过网络多次洗牌。忽略低级计算,每条记录都会被打乱 O(N) 次,其中 N 是您合并的 RDD 数量。

关于内存使用情况,如果不了解更多有关数据分布的信息,这一点并不明显,但在最坏的情况下,第二种方法可能会表现出明显更差的行为。

如果 + 使用恒定空间,则减少的唯一要求是使用 hashmap 来存储映射端组合的结果。由于分区作为数据流进行处理,而不将完整内容读取到内存中,这意味着每个任务的总内存大小将与唯一键的数量成正比,而不是与数据量成正比。由于第二种方法需要更多任务,因此总体内存使用量将高于第一种情况。平均而言,由于数据被部分组织,它可能会稍微好一些,但不太可能补偿额外的成本。


* 如果您想了解它如何影响整体性能,您可以查看 Spark iteration time increasing exponentially when using join这是略有不同的问题,但应该让您了解为什么控制分区数量很重要。

关于scala - Spark RDD : multiple reducebykey or just once,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37693543/

相关文章:

scala - Spark : how to run spark file from spark shell

java - 如果一个对象不再发生变化,它是否可以安全地跨线程共享? (斯卡拉/ java )

algorithm - 如果前缀表全为零,KMP 的性能如何?

performance - 如何衡量由许多子基准组成的基准的可变性?

python - 将位串转换为字节(霍夫曼编码)

python - 使用sparkConf().set设置2个配置值

java - 运行简单的 twitter 情绪分析代码时获取不存在的 jar 和 java.lang.ClassNotFoundException

java - 使用 glDrawArrays 绘图工作正常,但使用 glDrawElements 则失败

scala - 限制组合隐式参数和 View /上下文边界的原因是什么?

python - Spark : Broadcast variables: It appears that you are attempting to reference SparkContext from a broadcast variable, Action ,或转换