scala - Spark的RangePartitioner中的sketch方法在做什么

标签 scala hash apache-spark

我正在阅读apache Spark的源代码。我陷入了范围分区器草图方法的逻辑。有人可以解释一下这段代码到底在做什么吗?

// spark/core/src/main/scala/org/apache/spark/Partitioner.scala

def sketch[K:ClassTag](rdd: RDD[K],
  sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {

  val shift = rdd.id
  // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
  val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
    val seed = byteswap32(idx ^ (shift << 16))
    val (sample, n) = SamplingUtils.reservoirSampleAndCount(
    iter, sampleSizePerPartition, seed)
    Iterator((idx, n, sample))
  }.collect()
  val numItems = sketched.map(_._2.toLong).sum
  (numItems, sketched)
}

最佳答案

sketch 在 RangePartitioner 中用于对 RDD 分区中的值进行采样。也就是说,从每个 RDD 分区中均匀、随机地挑选和收集元素值的小子集。

请注意,sketch 用作 RangePartitioner 的一部分 - 计算出生成的大小大致相等的分区的范围边界。其他 RangePartitioner 代码中还发生了其他很酷的事情 - 即当它计算样本子集所需的大小 (sampleSizePerPartition) 时。

请参阅我的评论作为代码的一部分,以获取逐步说明。

def sketch[K:ClassTag](rdd: RDD[K],
  sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {

  val shift = rdd.id
  // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
  // run sampling function on every partition
  val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
    // partition number `idx` - and rdd.id are used to calculate unique seed for every partition - to ensure that elements are selected in unique manner for every parition
    val seed = byteswap32(idx ^ (shift << 16))
    // randomly select sample of n elements and count total number of elements in partition
    // what is cool about Reservoir Sampling - that it does it in a single pass - O(N) where N is number of elements in partition
    // see more http://en.wikipedia.org/wiki/Reservoir_sampling
    val (sample, n) = SamplingUtils.reservoirSampleAndCount(
    iter, sampleSizePerPartition, seed)
    Iterator((idx, n, sample))
  }.collect()
  val numItems = sketched.map(_._2.toLong).sum
  // returns total count of elements in RDD and samples
  (numItems, sketched)
}

关于scala - Spark的RangePartitioner中的sketch方法在做什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25481622/

相关文章:

scala - "-"中的 "+"和 `trait Function2[-T1, -T2, +R] extends AnyRef` 的解释

hash - 在 Redis 中交叉巨大的 HyperLogLogs 的最佳方法

c++ - 为什么在计算多项式时霍纳斯方法没有溢出

apache-spark - 如何获取 Spark 朴素贝叶斯分类器中类的概率?

hadoop - Spark 作业在 Yarn 集群上运行 java.io.FileNotFoundException : File does not exits ,,即使文件存在于主节点上

configuration - 错误 YarnClientSchedulerBackend : Asked to remove non-existent executor 21

javascript - 创建一个 Axios 帖子以使用 Dropzone 发送上传的文件,以便 Scala 函数处理相应的请求

scala - 通过名称(字符串)动态创建对象

scala - 如何理解以下 scala 调用

java - 为什么hashmap没有ArrayList那样的ensureCapacity()方法?