Scala Spark RDD、数据集、RDD 对和分区

标签 scala apache-spark rdd apache-spark-dataset

在 Scala Spark 中,有多种方法可以导致数据分区/重新分区。其中包括partitionBy、coalesce、repartition 和textFile 以及其他以分区计数作为参数的函数。下面,我使用 textFile 并指定至少 8 个分区。我不希望通过转换来撤消这些分区。要保留分区,您需要持久分区结果。但是,mapflatMap 等函数不会保留分区。我相信这会对性能产生影响。 PairRDDS 具有维护分区的 mapValues 和 flatMapValues

map 和 flatMapDataSets 和 RDDs 是否有等效的函数,并且不会搞乱分区?

如果我把这一切都搞混了,RDD 和 DataSet 如何维护分区,并记住 map 和 flatMap 操作是其操作的关键。

val tweets:RDD[Tweet] = mySpark.sparkContext.textFile(path,8).map(parseTweet).persist()
val numerical_fields_Tweets:Dataset[Tweet] = tweets.toDS()

Below is a screenshot from a youtube video stating that a map in a pairRDD results in an RDD without a partitioner

最佳答案

在 Spark 中,不重新分区/打乱数据的操作会保留分区(通过对先前建立的分区进行操作)。 mapflatMap 就是这样的操作:它们不会改变分区的数量。此外,map 不会更改分区内的行数或其顺序。

how do RDDs and DataSets maintain there partitions

您混合了两个概念:(1) 与数据转换中某个点关联的分区器,以及 (2) 数据被分割成的分区。

数据的分区方式与与数据关联的分区程序之间存在差异。如上所述,mapflatMap 不会更改分区数量,但它们不保证与数据关联的分区器。考虑 RDD 的 map:

/**
 * Return a new RDD by applying a function to all elements of this RDD.
 */
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

MapPartitionsRDD:

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],
    preservesPartitioning: Boolean = false, ...)

因此,虽然 map 不会重新分区数据,但它无法保证与数据关联的分区程序,因为 map 如何修改行没有限制.

配对 RDD,即 RDD[(K, V)] 有点特殊,因为它们通常是分区操作的结果,并且如果我们使用 mapValues 而不是 map,我们可以确定分区器没有改变,因为我们没有触及“键”。

/**
 * Pass each value in the key-value pair RDD through a map function without changing the keys;
 * this also retains the original RDD's partitioning.
 */
def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
  val cleanF = self.context.clean(f)
  new MapPartitionsRDD[(K, U), (K, V)](self,
    (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
    preservesPartitioning = true)
}

希望这有帮助!

关于Scala Spark RDD、数据集、RDD 对和分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54319415/

相关文章:

scala - sbt 程序集合并问题 [去重 : different file contents found in the following]

java - ConfigFactory ParseFile 通过 Java 系统属性的变量替换进行解析

scala - 是否可以(以及如何)使用spark-submit在命令行上指定sql查询

java - Spark 转换和 Action 的逻辑是否需要线程安全?

scala - 使用 Apache Spark 将 MongoDB 数据保存为 parquet 文件格式

database - 提升连接池?

apache-spark - Spark 知道 DataFrame 的分区键吗?

scala - spark 2.0 用 json 读取 csv

返回多个 RDD 的 Java Spark 映射步骤

performance - Apache Spark : map vs mapPartitions?