scala - 使用ReduceByKey 对值列表进行分组

标签 scala hadoop apache-spark mapreduce apache-spark-sql

我想对每个键的值列表进行分组,并且正在执行以下操作:

sc.parallelize(Array(("red", "zero"), ("yellow", "one"), ("red", "two"))).groupByKey().collect.foreach(println)

(red,CompactBuffer(zero, two))
(yellow,CompactBuffer(one))

但我注意到 Databricks 的一篇博客文章,建议不要对大型数据集使用 groupByKey。

Avoid GroupByKey

有没有办法使用reduceByKey达到相同的结果?

我尝试过这个,但它连接了所有值。顺便说一句,就我而言,键和值都是字符串类型。

sc.parallelize(Array(("red", "zero"), ("yellow", "one"), ("red", "two"))).reduceByKey(_ ++ _).collect.foreach(println)

(red,zerotwo)
(yellow,one)

最佳答案

使用aggregateByKey:

 sc.parallelize(Array(("red", "zero"), ("yellow", "one"), ("red", "two")))
.aggregateByKey(ListBuffer.empty[String])(
        (numList, num) => {numList += num; numList},
         (numList1, numList2) => {numList1.appendAll(numList2); numList1})
.mapValues(_.toList)
.collect()

scala> Array[(String, List[String])] = Array((yellow,List(one)), (red,List(zero, two)))

参见this answer有关aggregateByKey的详细信息,this link了解使用可变数据集 ListBuffer 背后的基本原理。

编辑:

有没有办法使用reduceByKey达到相同的结果?

上面的方法实际上性能较差,详情请参阅@zero323的评论。

关于scala - 使用ReduceByKey 对值列表进行分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37580303/

相关文章:

scala - 为什么 Scala 案例类生成两个应用方法?

hadoop - 我想通过mahout mapreduce工作获取距离矩阵

hadoop - 如何运行从 hdfs 到 s3 的加密 distcp?

scala - Haskell scala 互操作性

scala - 如何使用IntelliJ Idea创建SBT项目?

scala - Scala actor 可以同时处理多个消息吗?

database - 解析维基百科页面链接数据集

hadoop - 无法从 IDEA 连接到资源管理器

scala - 具有特征的 Spark 2.0 数据集编码器

apache-spark - 自定义 log4j 类不适用于 spark 2.0 EMR