scala - RDD中是否有任何 Action 保持顺序?

标签 scala apache-spark rdd reduce fold

我想要像 RDD 那样的 reduce 性能 Action ,但不需要运算符是可交换的。即我希望接下来的 result 始终是 "123456789"

scala> val rdd = sc.parallelize(1 to 9 map (_.toString))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> val result = rdd.someAction{ _+_ }

首先,我找到了 foldRDD#fold 的文档说:

def fold(zeroValue: T)(op: (T, T) ⇒ T): T Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral "zero value"



请注意,文档中不需要 可交换的 。然而,结果并不如预期:
scala> rdd.fold(""){ _+_ }
res10: String = 312456879

EDIT 我已经尝试过@dk14 提到的,但没有运气:
scala> val rdd = sc.parallelize(1 to 9 map (_.toString))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[48] at parallelize at <console>:24

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res22: String = 341276895

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res23: String = 914856273

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res24: String = 742539618

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res25: String = 271468359

最佳答案

Scala 中没有满足此标准的内置减少操作,但是您可以通过组合 mapPartitions 轻松实现自己的减少操作。 , collect和局部减少:

import scala.reflect.ClassTag

def orderedFold[T : ClassTag](rdd: RDD[T])(zero: T)(f: (T, T) => T): T = {
  rdd.mapPartitions(iter => Iterator(iter.foldLeft(zero)(f))).collect.reduce(f)
}

使用 collect 的组合和 reduce用于合并而不是 fold 使用的异步和无序方法确保保留全局顺序。

这当然会带来一些额外的费用,包括:
  • 驱动程序的内存占用略高。
  • 显着更高的延迟 - 我们在开始本地减少之前明确等待所有任务完成。
  • 关于scala - RDD中是否有任何 Action 保持顺序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38781320/

    相关文章:

    scala - spark数据帧爆炸功能错误

    scala - 什么是 scala 证据参数

    java - spark 中的 cache() 是改变 RDD 的状态还是创建一个新的?

    apache-spark - Spark中的RDD依赖是什么?

    scala - Spark Streaming 中的分区是如何工作的?

    scala - 替代模型scala

    scala - SBT 如何运行 InputTask

    scala - 配置 Spark 写入 HDFS 的 Avro 文件大小

    scala - 如何使用scala从spark中的RDD获取值

    python - 如何在 Python 中创建示例单列 Spark DataFrame?