从 spark 2.0.1 开始我有一些问题。我阅读了很多文档,但到目前为止找不到足够的答案:
df.select("foo")
df.select($"foo")
myDataSet.map(foo.someVal)
是类型安全的,不会转换为 RDD
但保持数据集表示/没有额外开销(2.0.0 的性能明智)df.select("foo")
没有 map 语句的类型安全?最佳答案
df.select("foo")
之间的区别和 df.select($"foo")
是签名。前者至少需要一个String
, 后一个零个或多个 Columns
.除此之外没有实际区别。 myDataSet.map(foo.someVal)
类型检查,但与任何 Dataset
一样操作使用 RDD
对象,并与 DataFrame
相比操作,有很大的开销。我们来看一个简单的例子:case class FooBar(foo: Int, bar: String)
val ds = Seq(FooBar(1, "x")).toDS
ds.map(_.foo).explain
== Physical Plan ==
*SerializeFromObject [input[0, int, true] AS value#123]
+- *MapElements <function1>, obj#122: int
+- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar
+- LocalTableScan [foo#117, bar#118]
如您所见,此执行计划需要访问所有字段并且必须访问
DeserializeToObject
. ds.select($"foo").explain
== Physical Plan ==
LocalTableScan [foo#117]
与之前显示的计划相比,它可以直接访问列。与其说是 API 的限制,不如说是操作语义不同的结果。
How could I df.select("foo") type-safe without a map statement?
没有这样的选择。虽然类型化的列允许您静态转换
Dataset
进入另一个静态类型Dataset
:ds.select($"bar".as[Int])
没有类型安全。还有一些其他尝试包括类型安全优化操作,like typed aggregations ,但是这个实验性的 API。
why should I use a UDF / UADF instead of a map
这完全取决于你。 Spark 中的每个分布式数据结构都有自己的优点和缺点(参见示例 Spark UDAF with ArrayType as bufferSchema performance issues)。
就个人而言,我发现静态类型
Dataset
是最没用的:Dataset[Row]
相同的优化范围(尽管它们共享存储格式和一些执行计划优化,但它并不能完全受益于代码生成或堆外存储)也不能访问 DataFrame
的所有分析功能. ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain
== Physical Plan ==
*Filter (foo#133 = 1)
+- *Filter <function1>.apply
+- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
+- Exchange hashpartitioning(foo#133, 200)
+- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
+- LocalTableScan [foo#133, bar#134]
相比:
ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain
== Physical Plan ==
*HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
+- Exchange hashpartitioning(foo#133, 200)
+- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
+- *Filter (foo#133 = 1)
+- LocalTableScan [foo#133, bar#134]
这会影响谓词下推或投影下推等功能。
RDDs
那么灵活仅原生支持一小部分类型。 Encoders
是有争议的 Dataset
使用 as
转换方法。由于未使用签名对数据形状进行编码,因此编译器只能验证 Encoder
的存在。 . 相关问题:
关于scala - Spark 2.0 数据集与数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40596638/