scala - 如何解释 RDD.treeAggregate

标签 scala apache-spark rdd distributed-computing

我遇到了 this line在 Apache Spark 代码源中

val (gradientSum, lossSum, miniBatchSize) = data
    .sample(false, miniBatchFraction, 42 + i)
    .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
      seqOp = (c, v) => {
        // c: (grad, loss, count), v: (label, features)
        val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
        (c._1, c._2 + l, c._3 + 1)
      },
      combOp = (c1, c2) => {
        // c: (grad, loss, count)
        (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
      }
    )

我在阅读这篇文章时遇到了很多麻烦:
  • 首先,我在网上找不到任何可以准确解释 treeAggregate 的内容。有效,参数的含义是什么。
  • 二、这里.treeAggregate方法名称后面似乎有两个 ()() 。那是什么意思?这是一些我不明白的特殊 Scala 语法。
  • 最后,我看到 seqOp 和 comboOp 都返回一个 3 元素元组,它与预期的左侧变量相匹配,但实际上返回的是哪个?

  • 这个说法一定很先进。我无法开始破译这个。

    最佳答案

    treeAggregateaggregate 的专门实现迭代地将组合函数应用于分区的子集。这样做是为了防止将所有部分结果返回给驱动程序,因为经典的 aggregate 将发生单遍减少。确实。

    出于所有实际目的,treeAggregate遵循与 aggregate 相同的原则在这个答案中解释:Explain the aggregate functionality in Python除了它需要一个额外的参数来指示部分聚合级别的深度。

    让我试着具体解释一下这里发生了什么:

    对于聚合,我们需要一个零、一个组合器函数和一个化简函数。aggregate用途 currying独立于 combine 和 reduce 函数指定零值。

    然后我们可以像这样剖析上面的函数。希望这有助于理解:

    val Zero: (BDV, Double, Long) = (BDV.zeros[Double](n), 0.0, 0L)
    val combinerFunction: ((BDV, Double, Long), (??, ??)) => (BDV, Double, Long)  =  (c, v) => {
            // c: (grad, loss, count), v: (label, features)
            val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
            (c._1, c._2 + l, c._3 + 1)
    val reducerFunction: ((BDV, Double, Long),(BDV, Double, Long)) => (BDV, Double, Long) = (c1, c2) => {
            // c: (grad, loss, count)
            (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
          }
    

    然后我们可以重写对 treeAggregate 的调用以更易消化的形式:
    val (gradientSum, lossSum, miniBatchSize) = treeAggregate(Zero)(combinerFunction, reducerFunction)
    

    这种形式会将结果元组“提取”为命名值 gradientSum, lossSum, miniBatchSize以供进一步使用。

    请注意 treeAggregate需要一个额外的参数 depth 声明为默认值 depth = 2 ,因此,由于此特定调用中未提供它,因此它将采用该默认值。

    关于scala - 如何解释 RDD.treeAggregate,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29860635/

    相关文章:

    sql - PySpark - 如何使用连接更新 Dataframe?

    python - 使用 groupby 或 aggregate 合并 RDD 或 DataFrame 中每个事务中的项目来做 FP-growth

    scala - 运行 Typesafe Console/Atmos 来监控 actor 系统/scala 应用程序。从 IntelliJ IDEA 或任何其他 IDE 运行

    scala - Scala 或 Jython 中多方法的替代方案

    scala - 动态合并 Akka 流

    hadoop - Spark 应用程序报告内存不足的 Oozie 工作流

    scala - SBT生成的Docker容器无法打包子项目

    apache-spark - 计算给定日期范围内pyspark窗口中的行数

    scala - 在分区数据上运行 groupByKey/reduceByKey,但使用不同的键

    python - pyspark从RDD中过滤列表