apache-spark - 与RDD相比,DataSet的性能优势

标签 apache-spark rdd apache-spark-dataset

在阅读了几篇关于Spark数据集的精彩文章(thisthisthis)之后,我总结了下一个DataSet相对于RDD的性能优势:


逻辑和物理计划优化;
严格的类型化;
向量化运算;
低级内存管理。


问题:


Spark的RDD还可以构建物理计划,并且可以在同一阶段组合/优化多个转换。那么,DataSet相对于RDD有什么好处?
the first link中,您可以看到RDD[Person]的示例。 DataSet是否具有高级类型?
“向量化操作”是什么意思?
据我了解,DataSet的低内存管理=高级序列化。这意味着可序列化对象的堆外存储,在这里您只能读取对象的一个​​字段而无需反序列化。但是,当您具有IN_MEMORY_ONLY持久策略时,情况又如何呢? DataSet是否会序列化所有情况?与RDD相比,它将具有任何性能优势吗?

最佳答案

Spark的RDD还可以构建物理计划,并且可以在同一阶段组合/优化多个转换。比起RDD,DataSet有什么好处?


使用RDD时,您所写的就是所得到的。虽然某些转换通过链接进行了优化,但执行计划是DAG的直接翻译。例如:

rdd.mapPartitions(f).mapPartitions(g).mapPartitions(h).shuffle()


其中shuffle是任意改组转换(*byKeyrepartition等),所有三个mapPartitionsmapflatMapfilter)将被链接而不创建中间对象,但不能重新排列。

Datasets相比,使用限制性更强的编程模型,但可以使用多种技术来优化执行,包括:


选择(filter)下推。例如,如果您有:

df.withColumn("foo", col("bar") + 1).where(col("bar").isNotNull())


可以执行为:

df.where(col("bar").isNotNull()).withColumn("foo", col("bar") + 1)

早期预测(select)和消除。例如:

df.withColumn("foo", col("bar") + 1).select("foo", "bar")


可以重写为:

df.select("foo", "bar").withColumn("foo", col("bar") + 1)


避免获取和传递过时的数据。在极端情况下,它可以完全消除特定的转换:

df.withColumn("foo", col("bar") + 1).select("bar")


可以优化为

df.select("bar")



这些优化之所以可行,有两个原因:


限制性数据模型,可以进行依赖关系分析,而无需进行复杂且不可靠的静态代码分析。
清晰的运算符语义。运算符无副作用,我们可以明确区分确定性和不确定性。


为了清楚起见,假设我们有以下数据模型:

case class Person(name: String, surname: String, age: Int)

val people: RDD[Person] = ???


我们希望检索所有21岁以上的人的姓。使用RDD可以将其表示为:

people
  .map(p => (p.surname, p.age))          // f
  .filter { case (_, age) => age > 21 }  // g


现在让我们问自己几个问题:


age中的输入f和带有ageg变量之间是什么关系?
f然后gg然后f相同吗?
fg副作用是否免费?


尽管答案对于人类读者来说是显而易见的,但对于假设的优化器而言却不是。与Dataframe版本相比:

people.toDF
  .select(col("surname"), col("age"))    // f'
  .where(col("age") > 21)                // g'


对于优化人员和读者来说,答案都是显而易见的。

使用静态类型的DatasetsSpark 2.0 Dataset vs DataFrame)时,这还会带来其他后果。


DataSet是否具有更高级的类型化?



否-如果您关心优化。最高级的优化仅限于Dataset[Row],目前无法对复杂的类型层次进行编码。
也许-如果您接受Kryo或Java编码器的开销。



“矢量化操作”是什么意思?


在优化的上下文中,我们通常指的是循环矢量化/循环展开。 Spark SQL使用代码生成来创建高级转换的编译器友好版本,可以对其进行进一步优化以利用矢量化指令集。


据我了解,DataSet的低内存管理=高级序列化。


不完全是。使用本机分配的最大优势是转义垃圾回收器循环。由于垃圾回收通常是Spark的限制因素,因此这是一个巨大的改进,尤其是在需要大型数据结构(例如准备随机播放)的环境中。

另一个重要方面是柱状存储,它可以实现有效的压缩(可能会减少内存占用)并优化对压缩数据的操作。

通常,您可以在纯RDDs上使用手工编写的代码来应用完全相同的优化类型。毕竟DatasetsRDDs支持。区别仅在于需要付出多少努力。


手工执行的计划优化相对容易实现。
使代码编译器更友好需要更深入的知识,并且容易出错且冗长。
sun.misc.Unsafe与本机内存分配一起使用并不适合胆小者。


尽管具有所有优点,但Dataset API并不通用。尽管某些类型的常见任务可以在许多情况下从其优化中受益,但与RDD相比,您可能没有任何改善甚至性能下降。

关于apache-spark - 与RDD相比,DataSet的性能优势,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41333707/

相关文章:

java - 获取 java.lang.IllegalArgumentException : requirement failed while calling Sparks MLLIB StreamingKMeans from java application

apache-spark - Spark日志中task id格式的含义

python - PySpark - sortByKey() 方法以原始顺序从 k,v 对返回值

java - 无法修改JavaRDD中的值

apache-spark - 处理大数据集时出现 FetchFailedException 或 MetadataFetchFailedException

python - 使用 PySpark 读取 Excel 文件 : Failed to find data source: com. crealytics.spark.excel

python - 使用具有特征的原始 RDD 项将 Pyspark Python k-means 模型预测插入 DF 中

Spark 2.x 数据集的 Kryo 序列化

java - 尝试在 Java 中的 Spark 数据集中添加列时出现空指针异常

apache-spark - 缓存和检查点是否应该在数据集上一起使用?如果是这样,这是如何在引擎盖下工作的?