scala - distinct() 是否对数据集进行排序?

标签 scala apache-spark

我正在编写一个预处理应用程序,除其他转换和操作外,它会在将数据集写入 HDFS 之前对数据集进行排序。一个新的请求要求我对数据集进行重复数据删除,所以我想在一个阶段进行排序。我的理解是,为了有效地进行重复数据删除,排序是必要的(也许我在这方面错了,没有研究太多,只是看起来很自然)。

由于某些原因(输出模式中的 MapType 列),我首先测试了 distinct早于 sort ,以为我会摆脱MapType稍后列以便将它们合并在一起。

Spark UI output

发生的事情是跳过了排序的第二阶段,就好像数据集已经排序了一样。这对我来说很有意义,但在文档 (AFAIK) 中的任何地方都不支持,我不知道,它是否是稳定的预期行为(我不想将其推向生产只是为了意识到我突然做 2 个昂贵的阶段:sortdistinct 两者)。任何人都对 sort 有更多的见解和/或 distinct实现的?

最佳答案

在 Spark 中,distinct以及一般所有的聚合操作(例如 groupBy )不要对数据进行排序。我们可以使用 explain 轻松检查。功能。

// Let's generate a df with 5 elements in [0, 4[ to have at least one duplicate
val data = spark.range(5).select(floor(rand() * 4) as "r")

data.distinct.explain
== Physical Plan ==
*HashAggregate(keys=[r#105L], functions=[])
+- Exchange hashpartitioning(r#105L, 200)
   +- *HashAggregate(keys=[r#105L], functions=[])
      +- *Project [FLOOR((rand(7842501052366484791) * 5.0)) AS r#105L]
         +- *Range (0, 10, step=1, splits=2)
HashAggregate + Exchange意味着元素被散列和混洗,以便具有相同散列的元素在同一分区中。然后,对具有相同散列的元素进行比较和去重。因此,数据不会在处理后排序。让我们检查一下:

data.distinct.show()
+---+                                                                           
|  r|
+---+
|  0|
|  3|
|  2|
+---+

现在让我们解决您对性能的担忧。如果您在重复数据删除后进行排序,则会发生以下情况。

data.distinct.orderBy("r").explain
== Physical Plan ==
*Sort [r#227L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(r#227L ASC NULLS FIRST, 200)
   +- *HashAggregate(keys=[r#227L], functions=[])
      +- Exchange hashpartitioning(r#227L, 200)
         +- *HashAggregate(keys=[r#227L], functions=[])
            +- *Project [FLOOR((rand(-8636860894475783181) * 4.0)) AS r#227L]
               +- *Range (0, 5, step=1, splits=2)

我们可以看到数据被打乱以进行重复数据删除( Exchange hashpartitioning )并再次打乱以进行排序( Exchange rangepartitioning )。那相当昂贵。这是因为排序需要按范围进行洗牌,以便同一范围内的元素最终位于同一分区中,然后可以对其进行排序。然而,我们可以在重复数据删除之前更聪明地进行排序:

data.orderBy("r").distinct.explain
== Physical Plan ==
*HashAggregate(keys=[r#227L], functions=[])
+- *HashAggregate(keys=[r#227L], functions=[])
   +- *Sort [r#227L ASC NULLS FIRST], true, 0
      +- Exchange rangepartitioning(r#227L ASC NULLS FIRST, 200)
         +- *Project [FLOOR((rand(-8636860894475783181) * 4.0)) AS r#227L]
            +- *Range (0, 5, step=1, splits=2)

只剩下一个交换。事实上,spark 知道在按范围 shuffle 之后,重复的元素在同一个分区中。因此它不会触发新的洗牌。

关于scala - distinct() 是否对数据集进行排序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56441314/

相关文章:

scala - UDF vs Spark sql vs 列表达式性能优化

java - 是否可以对 Apache Spark 中的所有工作人员执行命令?

Scala @ 运算符

java - ml.DecisionTreeClassificationModel 中的 mllib.DecisionTreeModel.toDebugString() 等效项

python - Spark : How to use HBase filter e. g QualiferFilter by python-api

mysql - 如何使用 SparkR 将 MySQL 数据库连接到 Apache Spark?

scala - 如何在Scala中产生格式正确的XML?

json - 如何从 scala 案例类中装饰不可变对象(immutable对象)图

scala - 将 cassandra 行映射到 Spark RDD 中的参数化类型

apache-spark - 如何将数据框的所有列转换为字符串