在 Scala Spark 中,有多种方法可以导致数据分区/重新分区。其中包括partitionBy、coalesce、repartition 和textFile 以及其他以分区计数作为参数的函数。下面,我使用 textFile
并指定至少 8 个分区。我不希望通过转换来撤消这些分区。要保留分区,您需要持久
分区结果。但是,map
和 flatMap
等函数不会保留分区。我相信这会对性能产生影响。 PairRDDS 具有维护分区的 mapValues 和 flatMapValues
。
map 和 flatMap
的 DataSets 和 RDDs
是否有等效的函数,并且不会搞乱分区?
如果我把这一切都搞混了,RDD 和 DataSet 如何维护分区,并记住 map 和 flatMap 操作是其操作的关键。
val tweets:RDD[Tweet] = mySpark.sparkContext.textFile(path,8).map(parseTweet).persist()
val numerical_fields_Tweets:Dataset[Tweet] = tweets.toDS()
最佳答案
在 Spark 中,不重新分区/打乱数据的操作会保留分区(通过对先前建立的分区进行操作)。 map
和 flatMap
就是这样的操作:它们不会改变分区的数量。此外,map
不会更改分区内的行数或其顺序。
how do RDDs and DataSets maintain there partitions
您混合了两个概念:(1) 与数据转换中某个点关联的分区器,以及 (2) 数据被分割成的分区。
数据的分区方式与与数据关联的分区程序之间存在差异。如上所述,map
和 flatMap
不会更改分区数量,但它们不保证与数据关联的分区器。考虑 RDD 的 map
:
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
和MapPartitionsRDD
:
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false, ...)
因此,虽然 map
不会重新分区数据,但它无法保证与数据关联的分区程序,因为 map
如何修改行没有限制.
配对 RDD,即 RDD[(K, V)]
有点特殊,因为它们通常是分区操作的结果,并且如果我们使用 mapValues
而不是 map
,我们可以确定分区器没有改变,因为我们没有触及“键”。
/**
* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
preservesPartitioning = true)
}
希望这有帮助!
关于Scala Spark RDD、数据集、RDD 对和分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54319415/