我有一个太大的 RDD,无法在没有虚假错误的情况下持续执行不同的语句(例如 SparkException 阶段失败 4 次,ExecutorLostFailure,HDFS 文件系统关闭,达到最大执行器失败次数,由于 SparkContext 关闭而取消阶段,等等)
我正在尝试计算特定列中的不同 ID,例如:
print(myRDD.map(a => a._2._1._2).distinct.count())
是否有一种简单、一致、较少 shuffle 密集型的方法来执行上述命令,可能使用 mapPartitions、reduceByKey、flatMap 或其他使用较少 shuffle 的命令而不是 distinct ?
另见 What are the Spark transformations that causes a Shuffle?
最佳答案
弄清楚是否存在另一个潜在问题可能会更好,但下面的内容将满足您的需求……而不是方法,但听起来它符合您的要求:
myRDD.map(a => (a._2._1._2, a._2._1._2))
.aggregateByKey(Set[YourType]())((agg, value) => agg + value, (agg1, agg2) => agg1 ++ agg2)
.keys
.count
甚至这似乎有效,但它不是关联和可交换的。它的工作原理取决于 Spark 的内部工作原理......但我可能会遗漏一个案例......所以虽然更简单,但我不确定我是否相信它:
myRDD.map(a => (a._2._1._2, a._2._1._2))
.aggregateByKey(YourTypeDefault)((x,y)=>y, (x,y)=>x)
.keys.count
关于scala - 有没有办法重写 Spark RDD distinct 以使用 mapPartitions 而不是 distinct?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31082066/