scala - 有没有办法重写 Spark RDD distinct 以使用 mapPartitions 而不是 distinct?

标签 scala apache-spark distinct shuffle rdd

我有一个太大的 RDD,无法在没有虚假错误的情况下持续执行不同的语句(例如 SparkException 阶段失败 4 次,ExecutorLostFailure,HDFS 文件系统关闭,达到最大执行器失败次数,由于 SparkContext 关闭而取消阶段,等等)

我正在尝试计算特定列中的不同 ID,例如:

print(myRDD.map(a => a._2._1._2).distinct.count())

是否有一种简单、一致、较少 shuffle 密集型的方法来执行上述命令,可能使用 mapPartitions、reduceByKey、flatMap 或其他使用较少 shuffle 的命令而不是 distinct ?

另见 What are the Spark transformations that causes a Shuffle?

最佳答案

弄清楚是否存在另一个潜在问题可能会更好,但下面的内容将满足您的需求……而不是方法,但听起来它符合您的要求:

myRDD.map(a => (a._2._1._2, a._2._1._2))
  .aggregateByKey(Set[YourType]())((agg, value) => agg + value, (agg1, agg2) => agg1 ++ agg2) 
  .keys
  .count

甚至这似乎有效,但它不是关联和可交换的。它的工作原理取决于 Spark 的内部工作原理......但我可能会遗漏一个案例......所以虽然更简单,但我不确定我是否相信它:
myRDD.map(a => (a._2._1._2, a._2._1._2))
  .aggregateByKey(YourTypeDefault)((x,y)=>y, (x,y)=>x)
  .keys.count

关于scala - 有没有办法重写 Spark RDD distinct 以使用 mapPartitions 而不是 distinct?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31082066/

相关文章:

java - transient 变量如何在 Worker 上可用

SQL查询在两个表中查找不同的值?

java - 为什么 ScheduledThreadPoolExecutor 的队列大小始终为 0?

Scala - 从 ISO-8859-1 转换为 UTF-8 会产生陌生字符

scala - Play 2.7 中的 RequestScoped

hadoop - 在hadoop中解析Spark驱动程序主机时出现错误

hadoop - Spark/Hadoop 无法读取根文件

MongoDB 不同的聚合

MySQL 选择不同的问题

scala - RDD 映射中的 Spark Scala 序列化错误