从 Spark 源代码:
/**
* Represents the content of the Dataset as an `RDD` of `T`.
*
* @group basic
* @since 1.6.0
*/
lazy val rdd: RDD[T] = {
val objectType = exprEnc.deserializer.dataType
rddQueryExecution.toRdd.mapPartitions { rows =>
rows.map(_.get(0, objectType).asInstanceOf[T])
}
}
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2972
mapPartitions
可能需要与计算 RDD
一样长的时间首先..所以这使得操作如df.rdd.getNumPartitions
非常贵。鉴于
DataFrame
是 DataSet[Row]
和 DataSet
由 RDD
组成为什么需要重新映射?任何见解表示赞赏。
最佳答案
TL;DR 那是因为内部 RDD
不是 RDD[Row]
.
Given that a DataFrame is
DataSet[Row]
and aDataSet
is composed of RDD's
这是一个巨大的过度简化。首先
DataSet[T]
并不意味着您与 T
的容器进行交互.这意味着如果您使用类似集合的 API(通常称为强类型),内部表示将被解码为 T
.内部表示是 Tungsten 内部使用的二进制格式。这种表示是内部的,可能会发生变化,而且级别太低而无法在实践中使用。
公开此数据的中间表示是
InternalRow
- rddQueryExecution.toRDD
实际上是RDD[InternalRow]
.这种表示(有不同的实现)仍然暴露了内部类型,被认为是“弱”私有(private)的,因为 o.a.s.sql.catalyst
中的所有对象(访问没有明确限制,但 API 没有记录),而且交互起来相当棘手。这就是解码发挥作用的地方以及为什么需要完整的“重新映射” - 将内部(通常是不安全的)对象转换为供公众使用的外部类型。
最后,to reiterate我之前的声明 - 当
getNumPartitions
时,有问题的代码不会被执行。叫做。
关于scala - 为什么 Spark DataFrame 转换为 RDD 需要完全重新映射?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54269477/