apache-spark - 为什么在 Spark 中需要折叠 Action ?

标签 apache-spark pyspark rdd reduce fold

我有一个愚蠢的问题,涉及 fold并减少 PySpark .我理解这两种方法之间的区别,但是,如果两者都需要应用函数是可交换的幺半群,我无法找出一个例子,其中 fold cannot be substituted by减少`。

此外,在 fold 的 PySpark 实现中它用于 acc = op(obj, acc) ,为什么用这个操作顺序而不是acc = op(acc, obj) ? (这个二阶对我来说听起来更接近 leftFold)

干杯

托马斯

最佳答案

空 RDD

RDD 时不能被替换是空的:

val rdd = sc.emptyRDD[Int]
rdd.reduce(_ + _)
// java.lang.UnsupportedOperationException: empty collection at   
// org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$ ...

rdd.fold(0)(_ + _)
// Int = 0

你当然可以结合 reduce条件为 isEmpty但它相当丑陋。

可变缓冲区

fold 的另一个用例是使用可变缓冲区进行聚合。考虑以下 RDD:
import breeze.linalg.DenseVector

val rdd = sc.parallelize(Array.fill(100)(DenseVector(1)), 8)

假设我们想要所有元素的总和。一个简单的解决方案是简单地减少 + :
rdd.reduce(_ + _)

不幸的是,它为每个元素创建了一个新向量。由于对象创建和随后的垃圾收集很昂贵,因此使用可变对象可能会更好。 reduce 不可能(RDD 的不变性并不意味着元素的不变性),但可以通过 fold 实现如下:
rdd.fold(DenseVector(0))((acc, x) => acc += x)

此处使用零元素作为可变缓冲区,每个分区初始化一次,不影响实际数据。

acc = op(obj, acc), why this operation order is used instead of acc = op(acc, obj)



SPARK-6416SPARK-7683

关于apache-spark - 为什么在 Spark 中需要折叠 Action ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34529953/

相关文章:

python - 如何创建 Spark udf 将 float 插值到 INT 以及如何编写比我所做的更好的逻辑

scala - Spark : Efficient way to test if an RDD is empty

java - apache Spark JavaPairRDD 中按键排序

pyspark - 在 pyspark 中使用 groupby 时无法获取所有列

PySpark reduceByKey 多个值

python - 如何将分区保存到特定名称的文件?

python - Pyspark 通过 RDD 中的键从单个 RDD 到多个 RDD

apache-spark - spark2 + yarn - 准备 AM 容器时出现空指针异常

Hadoop:两个数据节点,但 UI 显示一个和 Spark:两个工作人员 UI 显示一个

security - 保护集群上的 Spark 作业