scala - 为什么 Spark DataFrame 转换为 RDD 需要完全重新映射?

标签 scala apache-spark

从 Spark 源代码:

/**
   * Represents the content of the Dataset as an `RDD` of `T`.
   *
   * @group basic
   * @since 1.6.0
   */
  lazy val rdd: RDD[T] = {
    val objectType = exprEnc.deserializer.dataType
    rddQueryExecution.toRdd.mapPartitions { rows =>
      rows.map(_.get(0, objectType).asInstanceOf[T])
    }
  }

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2972
mapPartitions可能需要与计算 RDD 一样长的时间首先..所以这使得操作如
df.rdd.getNumPartitions

非常贵。鉴于 DataFrameDataSet[Row]DataSetRDD 组成为什么需要重新映射?任何见解表示赞赏。

最佳答案

TL;DR 那是因为内部 RDD不是 RDD[Row] .

Given that a DataFrame is DataSet[Row] and a DataSet is composed of RDD's



这是一个巨大的过度简化。首先DataSet[T]并不意味着您与 T 的容器进行交互.这意味着如果您使用类似集合的 API(通常称为强类型),内部表示将被解码为 T .

内部表示是 Tungsten 内部使用的二进制格式。这种表示是内部的,可能会发生变化,而且级别太低而无法在实践中使用。

公开此数据的中间表示是 InternalRow - rddQueryExecution.toRDD实际上是RDD[InternalRow] .这种表示(有不同的实现)仍然暴露了内部类型,被认为是“弱”私有(private)的,因为 o.a.s.sql.catalyst 中的所有对象(访问没有明确限制,但 API 没有记录),而且交互起来相当棘手。

这就是解码发挥作用的地方以及为什么需要完整的“重新映射” - 将内部(通常是不安全的)对象转换为供公众使用的外部类型。

最后,to reiterate我之前的声明 - 当 getNumPartitions 时,有问题的代码不会被执行。叫做。

关于scala - 为什么 Spark DataFrame 转换为 RDD 需要完全重新映射?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54269477/

相关文章:

scala - 过滤字符串序列直到在scala中找到键的功能方法

scala - 如何编写返回 Writer[List[Int], Int] 的函数?

performance - Apache Spark : map vs mapPartitions?

apache-spark - 您如何使用 spark 和 elasticsearch-hadoop 从/写入不同的 ElasticSearch 集群?

Scala 注释列表?

scala - 类型别名中的方差注释

scala - 在 Scala foreach 循环中赋值

scala - 线程 “main” org.apache.hadoop.mapred.InvalidInputException中的异常

scala - Spark 是否优化了链式转换?

scala - Apache Spark RDD 中每个唯一键的总和值