scala - CombineBy Key Spark 方法

标签 scala hadoop apache-spark

下面的这段代码应该使用 combineByKey() 找到 Per-Key Average:

val result = input.combineByKey(
(v) => (v, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)).
map{ case (key, value) => (key, value._1 / value._2.toFloat) }  
result.collectAsMap().map(println(_))

我对上述方法的执行感到困惑。假设我们有数据集
( (1,1), (1,3), (2,4), (2,3), (3,1) )

所以 combineByKey 的执行看起来像这样?:

1) 首先,它会创建一个带有 (1,1) 的累加器。
2) 那么当它遇到具有相同键(1) 的元组时,它会将键值加在一起吗?因此,当它遇到 (1,3) 时,键 1 的新累加器将类似于 (2,2)。由于它添加了 (1,1) 和 (1,3) 的键,并且由于有两个键为 1 的元组,它将在 ( 2,2).
3) 然后它将继续对所有相同的键执行此操作。
4) 最后,它将从每个分区中取出所有累加器,并将键(元组的左侧)和它发生的次数(元组的右侧)添加到一个元组中 key 。

抱歉,如果这有点不对,我还在习惯函数式编程方法!

最佳答案

通常情况下,通过查看方法和包含类的类型可以获得很多清晰度。

PairRDDFunctions[K, V]

def combineByKey[C]( createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

我们有一个包含 2 个类型参数的类,一个键和一个值,以及一个包含一个组合器的方法。

你被要求提供功能

  • 将一个值变成一个组合器
  • 将一个 Value 和一个 Combiner 变成一个 Combiner
  • 将一个 Combiner 和一个 Combiner 变成一个 Combiner

立即,这使您无法描述将键加在一起,因为我们没有提供任何对键进行操作的方法。

对于每个键:

  1. 首先,它将根据值创建 Combiner,在本例中,将值放入 tuple2 的第一个槽中,第二个槽中为 1 (1, 1)
  2. 然后它将通过将值添加到 tuple2 的第一个插槽并递增第二个插槽,将同一键的每个附加值合并到组合器中。 (1 + 3, 1 + 1) == (4, 2)
  3. 然后它将继续为同一键的所有条目执行此操作。
  4. 然后最后它将从每个分区中获取所有累加器并将值(元组的左侧)和它发生的次数(元组的右侧)添加到一个元组中每个键。

您的困惑可能源于您的键和值属于同一类型这一事实。如果您将键更改为 Strings,您的代码将编译,但如果您对值进行此操作则不会。

关于scala - CombineBy Key Spark 方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37149151/

相关文章:

scala - 带上限的联合类型

hadoop - 打开 IgnitePath 时如何获取 InputStream(返回 HadoopIgfsSecondaryFileSystemPositionedReadable)?

apache-spark - 解析spark-csv数据帧读取器中的微/纳秒时间戳: Inconsistent results

mysql - 使用 Scala 解析 AWS Glue 中的 MySQL 0000-00-00 日期格式

Java Spark提交: Exception thrown in awaitResult

scala - 检查 arraytype 列是否包含 null

ScalaFiddle 除以零

hadoop - 如何在不询问本地机器密码的情况下启动hadoop?

mysql - Scala/MySQL 列不能为空

hadoop - hadoop 任务管理器 UI 在哪里