scala - 按键减少 : How does it work internally?

标签 scala apache-spark rdd

我是 Spark 和 Scala 新手。我对 Spark 中的 reduceByKey 函数的工作方式感到困惑。假设我们有以下代码:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

map 函数很清楚:s 是键,它指向 data.txt 中的行,1 是值。

但是,我不明白reduceByKey内部是如何工作的? “a”是否指向 key ?或者,“a”是否指向“s”?那么a+b代表什么呢?它们是如何填充的?

最佳答案

让我们将其分解为离散的方法和类型。这通常会暴露新开发人员的复杂性:

pairs.reduceByKey((a, b) => a + b)

变成

pairs.reduceByKey((a: Int, b: Int) => a + b)

重命名变量使其更加明确

pairs.reduceByKey((accumulatedValue: Int, currentValue: Int) => accumulatedValue + currentValue)

因此,我们现在可以看到,我们只是为给定键获取累积值,并将其与该键的下一个值相加。现在,让我们进一步分解它,以便我们能够理解关键部分。因此,让我们将该方法形象化,如下所示:

pairs.reduce((accumulatedValue: List[(String, Int)], currentValue: (String, Int)) => {
  //Turn the accumulated value into a true key->value mapping
  val accumAsMap = accumulatedValue.toMap   
  //Try to get the key's current value if we've already encountered it
  accumAsMap.get(currentValue._1) match { 
    //If we have encountered it, then add the new value to the existing value and overwrite the old
    case Some(value : Int) => (accumAsMap + (currentValue._1 -> (value + currentValue._2))).toList
    //If we have NOT encountered it, then simply add it to the list
    case None => currentValue :: accumulatedValue 
  }
})

因此,您可以看到,reduceByKey 采用查找 key 并跟踪它的样板,这样您就不必担心管理该部分。

如果你愿意,可以更深入、更真实

尽管如此,这只是所发生情况的简化版本,因为这里进行了一些优化。此操作是关联的,因此 Spark 引擎将首先在本地执行这些缩减(通常称为映射端缩减),然后在驱动程序处再次执行。这样可以节省网络流量;它可以将数据减少到尽可能小,然后通过网络发送该减少量,而不是发送所有数据并执行操作。

关于scala - 按键减少 : How does it work internally?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30145329/

相关文章:

r - 如何让sparklyr::spark_apply() 产生多个worker?

Python:如果存在空值,如何将 Pyspark 列转换为日期类型

apache-spark - rdd后面的数字是什么意思

scala - 在Scala-Spark1.5.2中递归过滤RDD

scala - 编译失败 : error while loading AnnotatedElement, Scala 2.10 下 Java 8 的 ConcurrentMap、CharSequence?

scala - Play 上的代码覆盖率!项目

scala - Chisel:从测试仪访问模块参数

performance - 我在哪里可以找到 Spark 中的操作成本?

scala - 将 RDD 中的元组分解为两个元组

scala - 如何从其他包中引用 scala 枚举