scala - spark RDD折叠方法的解释

标签 scala apache-spark rdd

我正在运行为 Hadoop-2.4 预先构建的 Spark-1.4.0(在本地模式下)来计算 DoubleRDD 的平方和。我的 Scala 代码看起来像

sc.parallelize(Array(2., 3.)).fold(0.0)((p, v) => p+v*v)

它给出了一个令人惊讶的结果97.0

与 Scala 版本的 fold

相比,这非常违反直觉
Array(2., 3.).fold(0.0)((p, v) => p+v*v)

它给出了预期的答案13.0

由于缺乏理解,我似乎很可能在代码中犯了一些棘手的错误。我已经阅读了 RDD.fold() 中使用的函数应该如何进行通信,否则结果可能取决于分区等。例如,如果我将分区数更改为 1,

sc.parallelize(Array(2., 3.), 1).fold(0.0)((p, v) => p+v*v)

代码将在我的机器上为我提供 169.0!

有人能解释一下这里到底发生了什么吗?

最佳答案

嗯,official documentation 确实很好地解释了它。 :

Aggregate the elements of each partition, and then the results for all the partitions, using a given associative and commutative function and a neutral "zero value". The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.

This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. This fold operation may be applied to partitions individually, and then fold those results into the final result, rather than apply the fold to each element sequentially in some defined ordering. For functions that are not commutative, the result may differ from that of a fold applied to a non-distributed collection.

为了说明正在发生的事情,让我们尝试逐步模拟正在发生的事情:

val rdd = sc.parallelize(Array(2., 3.))

val byPartition = rdd.mapPartitions(
    iter => Array(iter.fold(0.0)((p, v) => (p +  v * v))).toIterator).collect()

它为我们提供了类似于 Array[Double] = Array(0.0, 0.0, 0.0, 4.0, 0.0, 0.0, 0.0, 9.0)

byPartition.reduce((p, v) => (p + v * v))

返回 97

需要注意的重要一点是,每次运行的结果可能会有所不同,具体取决于分区组合的顺序。

关于scala - spark RDD折叠方法的解释,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31476635/

相关文章:

apache-spark - 如何有效地找到 PySpark 数据帧中每列的 Null 和 Nan 值的计数?

scala - 如何将流程执行实现为响应式(Reactive)“Observable[String]”

scala - 根据 SPARK scala 中的条件处理 RDD

java - Spark中通过SWIFT从对象存储获取数据需要什么配置

scala - 无法通过 Elasticsearch-hadoop 库在多个 spark 节点上的 RDD 上应用映射

apache-spark - 删除rdd中的空行

scala - 如何在Scala Spark中对RDD进行排序?

java - 如何在scala代码中引用MyClass::new?

java - Apache Spark 应用程序路径中的冒号

scala - "r0"、 "r1"等在 Scala 中有特殊含义吗?