java - Spark - 如何保持 JavaPairRDD 中分组值数量的最大限制

标签 java apache-spark bigdata rdd

我有一个像这样的RDD:

JavaPairRDD<String, String> 

有很多条目,并且某些键重复了很多次。当我应用groupByKeycombineByKey时,它会生成另一个

JavaPairRDD<String, Iterable<String>

问题是,对于某些键集,值的数量非常巨大(因为特定键是倾斜的)。这会导致进一步的下游消耗问题,甚至产生内存问题。

我的问题是如何限制每个键聚合的值的数量。我想按键分组,但值列表不应超出限制 X 数。任何溢出的值都应该添加到新行中,有办法做到这一点吗?

最佳答案

这可以使用flatMap来解决。我不知道如何用 Java 编写它,但是希望您可以转换 Scala 代码。带有示例输入的代码:

val rdd = spark.sparkContext.parallelize(Seq((1, Iterable(1,2,3,4,5)), (2, Iterable(6,7,8)), (3, Iterable(1,3,6,8,4,2,7,8,3))))

val maxLength = 3
val res = rdd.flatMap{ case(id, vals) =>
  vals.grouped(maxLength).map(v => (id, v))
}

这个想法是将列表拆分为列表列表,其中每个内部列表都有最大长度。由于这里使用了 flatMap ,因此列表的列表将被展平为一个简单的列表,这就是您想要的结果。使用最大长度 3 并打印 res 给出:

(1,List(1, 2, 3))
(1,List(4, 5))
(2,List(6, 7, 8))
(3,List(1, 3, 6))
(3,List(8, 4, 2))
(3,List(7, 8, 3))

关于java - Spark - 如何保持 JavaPairRDD 中分组值数量的最大限制,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48373786/

相关文章:

java - 此类中 getter/setter 的用途

python - 如何替换 spark 数据框所有列中的多个字符?

scala - 为什么 Spark/Scala 编译器无法在 RDD[Map[Int, Int]] 上找到 toDF?

mysql - 打开巨大的mySQL数据库

java - 如何从命令行界面检查 Apache Phoenix 的版本?

multithreading - 多个小的 h5 文件或一个巨大的文件之间最好的是什么?

java - 将 Mockito 与 Kotlin 一起使用时,如何绕过 any() 不得为空?

java - 如何仅在线程未执行或之前未启动或仅应创建一个线程实例时启动该线程

java - 是否可以在收到请求正文之前拦截请求

apache-spark - 如何在 Spark SQL 的 WITH 子句中缓存子查询结果