下面的这段代码应该使用 combineByKey() 找到 Per-Key Average:
val result = input.combineByKey(
(v) => (v, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)).
map{ case (key, value) => (key, value._1 / value._2.toFloat) }
result.collectAsMap().map(println(_))
我对上述方法的执行感到困惑。假设我们有数据集
( (1,1), (1,3), (2,4), (2,3), (3,1) )
。
所以 combineByKey 的执行看起来像这样?:
1) 首先,它会创建一个带有 (1,1)
的累加器。
2) 那么当它遇到具有相同键(1) 的元组时,它会将键值加在一起吗?因此,当它遇到 (1,3)
时,键 1 的新累加器将类似于 (2,2)
。由于它添加了 (1,1) 和 (1,3)
的键,并且由于有两个键为 1 的元组,它将在 ( 2,2)
.
3) 然后它将继续对所有相同的键执行此操作。
4) 最后,它将从每个分区中取出所有累加器,并将键(元组的左侧)和它发生的次数(元组的右侧)添加到一个元组中 key 。
抱歉,如果这有点不对,我还在习惯函数式编程方法!
最佳答案
通常情况下,通过查看方法和包含类的类型可以获得很多清晰度。
PairRDDFunctions[K, V]
def combineByKey[C](
createCombiner: (V) ⇒ C,
mergeValue: (C, V) ⇒ C,
mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]
我们有一个包含 2 个类型参数的类,一个键和一个值,以及一个包含一个组合器的方法。
你被要求提供功能
- 将一个值变成一个组合器
- 将一个 Value 和一个 Combiner 变成一个 Combiner
- 将一个 Combiner 和一个 Combiner 变成一个 Combiner
立即,这使您无法描述将键加在一起,因为我们没有提供任何对键进行操作的方法。
对于每个键:
- 首先,它将根据值创建 Combiner,在本例中,将值放入 tuple2 的第一个槽中,第二个槽中为 1
(1, 1)
。 - 然后它将通过将值添加到 tuple2 的第一个插槽并递增第二个插槽,将同一键的每个附加值合并到组合器中。 (1 + 3, 1 + 1) == (4, 2)
- 然后它将继续为同一键的所有条目执行此操作。
- 然后最后它将从每个分区中获取所有累加器并将值(元组的左侧)和它发生的次数(元组的右侧)添加到一个元组中每个键。
您的困惑可能源于您的键和值属于同一类型这一事实。如果您将键更改为 Strings
,您的代码将编译,但如果您对值进行此操作则不会。
关于scala - CombineBy Key Spark 方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37149151/