scala - 这是在 RDD 上实现惰性 `take` 的合适方法吗?

标签 scala apache-spark

非常不幸的是,RDD 上的 take 是一个严格的操作而不是惰性操作,但我不会解释为什么我现在认为这是一个令人遗憾的设计。

我的问题是这是否是 RDD 的惰性 take 的合适实现。它似乎有效,但我可能遗漏了一些不明显的问题。

def takeRDD[T: scala.reflect.ClassTag](rdd: RDD[T], num: Long): RDD[T] =
  new RDD[T](rdd.context, List(new OneToOneDependency(rdd))) {
    // An unfortunate consequence of the way the RDD AST is designed
    var doneSoFar = 0L

    def isDone = doneSoFar >= num

    override def getPartitions: Array[Partition] = rdd.partitions

    // Should I do this? Doesn't look like I need to
    // override val partitioner = self.partitioner

    override def compute(split: Partition, ctx: TaskContext): Iterator[T] = new Iterator[T] {
      val inner = rdd.compute(split, ctx)

      override def hasNext: Boolean = !isDone && inner.hasNext

      override def next: T = {
        doneSoFar += 1
        inner.next
      }
    }
  }

最佳答案

回答你的问题

不,这行不通。没有办法让一个变量可以在 Spark 集群中同时看到和更新,而这正是您要使用 doneSoFar 作为。如果您尝试这样做,那么当您运行 compute(跨多个节点并行)时,您将

a) 序列化任务中的 takeRDD,因为您引用了类变量 doneSoFar。这意味着您将类写入字节并在每个 JVM(执行程序)中创建一个新实例

b) 在计算中更新doneSoFar,这会更新每个执行程序 JVM 上的本地实例。您将从每个分区中获取数量等于 num 的元素。

由于那里的某些 JVM 属性,这可能会在 Spark 本地模式下工作,但在集群模式下运行 Spark 时肯定不会工作。

为什么take是一个 Action ,而不是转换

RDD 是分布式的,因此子集化为精确数量的元素是一个低效的操作——它不能完全并行完成,因为每个分片都需要关于其他分片的信息(比如是否应该计算它). Take 非常适合将分布式数据带回本地内存。

rdd.sample 是分布式世界中的类似操作,可以轻松并行运行。

关于scala - 这是在 RDD 上实现惰性 `take` 的合适方法吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40918222/

相关文章:

java - Source.fromResource() 在本地工作,但在服务器上抛出 java.nio.charset.MalformedInputException

scala - 如何在嵌套案例类中查找和修改字段?

apache-spark - 使 Python 字典可用于所有 spark 分区

scala - Spark zipWithIndex 在并行实现中是否安全?

scala - Scala 函数定义中的多个参数子句有何意义?

scala - q"null"的类型是 Literal,因此对于返回类型 Expr[X] 无效

algorithm - 如何从递归函数中获取终止原因?

python - 通过 Jenkins 运行时在 docker 中产生 "java gateway process exited before sending the driver its port number"

apache-spark - 从另一个 pod 内通过 "spark-on-k8s-operator"提交 Spark 作业

mysql - 来自另一个 SQL 的列的子字符串