scala - Spark : difference of semantics between reduce and reduceByKey

标签 scala apache-spark rdd reduce

在 Spark 的文档中,它说 RDDs 方法 reduce 需要一个关联的和可交换的二元函数。

但是,该方法 reduceByKey 只需要一个关联的二元函数。

sc.textFile("file4kB", 4)

我做了一些测试,显然这是我得到的行为。为什么会有这种差异?为什么reduceByKey确保在 reduce 时始终按特定顺序应用二元函数(以适应缺乏可交换性)才不是?

例如,如果加载一些具有 4 个分区(最少)的(小)文本:
val r = sc.textFile("file4k", 4)

然后:
r.reduce(_ + _)

返回一个字符串,其中部分的顺序并不总是相同,而:
r.map(x => (1,x)).reduceByKey(_ + _).first

始终返回相同的字符串(其中所有内容的顺序与原始文件中的顺序相同)。

(我查了r.glom,文件内容确实分布在4个分区上,没有空分区)。

最佳答案

就我而言,这是文档中的错误,您看到的结果只是偶然的。实践,other resources和一个简单的 analysis of the code显示传递给 reduceByKey 的函数不仅应该是关联的,而且还应该是可交换的。

  • 实践 - 虽然看起来顺序是在本地模式下保留的,但在集群上运行 Spark 时不再如此,包括独立模式。
  • 其他资源 - 引用 Data Exploration Using Spark来自 AmpCamp 3 :

    There is a convenient method called reduceByKey in Spark for exactly this pattern. Note that the second argument to reduceByKey determines the number of reducers to use. By default, Spark assumes that the reduce function is commutative and associative and applies combiners on the mapper side.

  • 代码 - reduceByKey使用 combineByKeyWithClassTag 实现并创建 ShuffledRDD .由于 Spark 不保证 shuffle 后的顺序,因此恢复的唯一方法是将一些元数据附加到部分减少的记录。据我所知,不会发生这样的事情。

  • 附注 reduce因为它是在 PySpark 中实现的,所以它可以与仅可交换的函数一起正常工作。这当然只是实现的一个细节,而不是契约(Contract)的一部分。

    关于scala - Spark : difference of semantics between reduce and reduceByKey,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35205107/

    相关文章:

    scala - 以编程方式检查字符串是否为 Scala 中的保留字

    Java 8 - 将 Kairosdb 中的多个对象列表保存到 csv 文件中

    apache-spark - 在 Spark SQL 中使用 Avro 模式和 Parquet 格式进行读/写

    python - 在 PySpark 上将日期时间转换为日期

    scala - Spark 映射分区带索引 : Identify a partition

    spring - 如何让 Spring 连接我的 JmsComponent

    scala - 以 None 为起始值的折叠集合

    scala - 为什么在没有任何参数的情况下调用采用 Any 的方法是合法的?

    apache-spark - 使用 Scala 转换 PySpark RDD

    java - 停止Context后如何重用spark RDD