scala - Spark - 按键分组,然后按值计数

标签 scala apache-spark mapreduce rdd key-value

我有使用 RDD Array[String]

中的 map 函数创建的非唯一键值对
val kvPairs = myRdd.map(line => (line(0), line(1)))

这会产生以下格式的数据:

1, A
1, A
1, B
2, C

我想按它们的值对所有键进行分组,并提供这些值的计数,如下所示:

1, {(A, 2), (B, 1)}
2, {(C, 1)}

我尝试了很多不同的尝试,但我能得到的最接近的是这样的:

kvPairs.sortByKey().countByValue()

这给了

1, (A, 2)
1, (B, 1)
2, (C, 1)

还有,

kvPairs.groupByKey().sortByKey()

提供值(value),但还不够:

1, {(A, A, B)}
2, {(C)}

我尝试将两者结合在一起:

kvPairs.countByValue().groupByKey().sortByKey()

但这会返回错误

error: value groupByKey is not a member of scala.collection.Map[(String, String),Long]

最佳答案

直接数对数,然后分组(如果必须的话):

kvPairs.map((_, 1L))
  .reduceByKey(_ + _)
  .map{ case ((k, v), cnt) => (k, (v, cnt)) }
  .groupByKey

如果你想 gropuByKey 在减少后你可能想使用自定义分区器,它只考虑键的第一个元素。您可以查看RDD split and do aggregation on new RDDs示例实现。

关于scala - Spark - 按键分组,然后按值计数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35763284/

相关文章:

hadoop - 有关使用Hive和 “(CDH 4.2.0), yarn (Hadoop 2.4.0)”构建Spark的信息?

amazon-web-services - 在 AWS EMR 集群中哪里可以找到节点日志?

hadoop - hadoop 中的数据包计数(使用 Mapreduce)

Scala 抽象类

mysql - 连接到 MySql 的 Scala Play 显示运行时错误 "Table ' token '不存在”

list - 更新 List 的最后一个元素

java - 仅限 map 的工作 - 订单

scala - 不可变泛型类的特定于类型的函数

java - Spark 历史日志手动解压

hadoop - 使用Hadoop MapReduce从文本文件中的列检索唯一结果