我是 Spark 和 Scala 新手。我对 Spark 中的 reduceByKey 函数的工作方式感到困惑。假设我们有以下代码:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
map 函数很清楚:s 是键,它指向 data.txt
中的行,1 是值。
但是,我不明白reduceByKey内部是如何工作的? “a”是否指向 key ?或者,“a”是否指向“s”?那么a+b代表什么呢?它们是如何填充的?
最佳答案
让我们将其分解为离散的方法和类型。这通常会暴露新开发人员的复杂性:
pairs.reduceByKey((a, b) => a + b)
变成
pairs.reduceByKey((a: Int, b: Int) => a + b)
重命名变量使其更加明确
pairs.reduceByKey((accumulatedValue: Int, currentValue: Int) => accumulatedValue + currentValue)
因此,我们现在可以看到,我们只是为给定键获取累积值,并将其与该键的下一个值相加。现在,让我们进一步分解它,以便我们能够理解关键部分。因此,让我们将该方法形象化,如下所示:
pairs.reduce((accumulatedValue: List[(String, Int)], currentValue: (String, Int)) => {
//Turn the accumulated value into a true key->value mapping
val accumAsMap = accumulatedValue.toMap
//Try to get the key's current value if we've already encountered it
accumAsMap.get(currentValue._1) match {
//If we have encountered it, then add the new value to the existing value and overwrite the old
case Some(value : Int) => (accumAsMap + (currentValue._1 -> (value + currentValue._2))).toList
//If we have NOT encountered it, then simply add it to the list
case None => currentValue :: accumulatedValue
}
})
因此,您可以看到,reduceByKey 采用查找 key 并跟踪它的样板,这样您就不必担心管理该部分。
如果你愿意,可以更深入、更真实
尽管如此,这只是所发生情况的简化版本,因为这里进行了一些优化。此操作是关联的,因此 Spark 引擎将首先在本地执行这些缩减(通常称为映射端缩减),然后在驱动程序处再次执行。这样可以节省网络流量;它可以将数据减少到尽可能小,然后通过网络发送该减少量,而不是发送所有数据并执行操作。
关于scala - 按键减少 : How does it work internally?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30145329/