apache-spark - 需要像 "def groupByKeyWithRDD(partitioner: Partitioner): RDD[(K, RDD[V])] = ???"这样的东西

标签 apache-spark rdd

在我们使用 groupByKey(...): RDD[(K, Iterable[V]] 的用例中,可能存在这样的情况:即使对于单个键(尽管是极端情况),关联的 Iterable[ V] 可能会导致 OOM。

是否可以提供上述“groupByKeyWithRDD”?

并且,理想情况下,如果 RDD[V] 的内部实现足够智能,仅在配置的阈值时将数据溢出到磁盘中,那就太好了。这样,我们也不会牺牲正常情况下的性能。

欢迎任何建议/意见。非常感谢!

附注:我们确实理解这里提到的要点:https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html ,而“reduceByKey”、“foldByKey”目前还不太符合我们的需求,也就是说,我们无法真正避免“groupByKey”。

最佳答案

假设 #(of-unique-keys) << #(key-value-pairs)(看起来确实如此),则应该不需要 RDD[(K, RDD[V])] 。相反,您可以转换为 Map[(K, RDD[V])]通过使用过滤器映射唯一键:

import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD

def splitByKey[K : ClassTag, V: ClassTag](rdd: RDD[(K, V)]): Map[K, RDD[V]] = {
  val keys = rdd.keys.distinct.collect.toSeq
  keys.map(key => (key -> rdd.filter{case (k, _) => k == key}.values)).toMap
}

它需要对数据进行多次扫描,因此它并不便宜,但不需要洗牌,可以更好地控制缓存,并且只要初始 RDD 适合内存,就不太可能导致 OOM。

关于apache-spark - 需要像 "def groupByKeyWithRDD(partitioner: Partitioner): RDD[(K, RDD[V])] = ???"这样的东西,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33706854/

相关文章:

apache-spark - spark的哪些操作是并行处理的?

scala - 基于spark中的模式匹配加载文件

scala - 为什么 Spark Cassandra 连接器因 NoHostAvailableException 而失败?

java - 使用 Maven 时出现 Apache Spark 错误

python - 如何在spark中将rdd数据一分为二?

python - 如何从 RDD[PYSPARK] 中删除重复值

python - 如何在 PySpark 中比较两个 LabeledPoint?

scala - 如何实现 Functor[数据集]

python - 在 Spark 数据框中生成可重复的唯一 ID

apache-spark - Spark Join 在列中返回空值