我正在编写一个预处理应用程序,除其他转换和操作外,它会在将数据集写入 HDFS 之前对数据集进行排序。一个新的请求要求我对数据集进行重复数据删除,所以我想在一个阶段进行排序。我的理解是,为了有效地进行重复数据删除,排序是必要的(也许我在这方面错了,没有研究太多,只是看起来很自然)。
由于某些原因(输出模式中的 MapType
列),我首先测试了 distinct
早于 sort
,以为我会摆脱MapType
稍后列以便将它们合并在一起。
发生的事情是跳过了排序的第二阶段,就好像数据集已经排序了一样。这对我来说很有意义,但在文档 (AFAIK) 中的任何地方都不支持,我不知道,它是否是稳定的预期行为(我不想将其推向生产只是为了意识到我突然做 2 个昂贵的阶段:sort
和 distinct
两者)。任何人都对 sort
有更多的见解和/或 distinct
实现的?
最佳答案
在 Spark 中,distinct
以及一般所有的聚合操作(例如 groupBy
)不要对数据进行排序。我们可以使用 explain
轻松检查。功能。
// Let's generate a df with 5 elements in [0, 4[ to have at least one duplicate
val data = spark.range(5).select(floor(rand() * 4) as "r")
data.distinct.explain
== Physical Plan ==
*HashAggregate(keys=[r#105L], functions=[])
+- Exchange hashpartitioning(r#105L, 200)
+- *HashAggregate(keys=[r#105L], functions=[])
+- *Project [FLOOR((rand(7842501052366484791) * 5.0)) AS r#105L]
+- *Range (0, 10, step=1, splits=2)
HashAggregate
+ Exchange
意味着元素被散列和混洗,以便具有相同散列的元素在同一分区中。然后,对具有相同散列的元素进行比较和去重。因此,数据不会在处理后排序。让我们检查一下:data.distinct.show()
+---+
| r|
+---+
| 0|
| 3|
| 2|
+---+
现在让我们解决您对性能的担忧。如果您在重复数据删除后进行排序,则会发生以下情况。
data.distinct.orderBy("r").explain
== Physical Plan ==
*Sort [r#227L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(r#227L ASC NULLS FIRST, 200)
+- *HashAggregate(keys=[r#227L], functions=[])
+- Exchange hashpartitioning(r#227L, 200)
+- *HashAggregate(keys=[r#227L], functions=[])
+- *Project [FLOOR((rand(-8636860894475783181) * 4.0)) AS r#227L]
+- *Range (0, 5, step=1, splits=2)
我们可以看到数据被打乱以进行重复数据删除(
Exchange hashpartitioning
)并再次打乱以进行排序( Exchange rangepartitioning
)。那相当昂贵。这是因为排序需要按范围进行洗牌,以便同一范围内的元素最终位于同一分区中,然后可以对其进行排序。然而,我们可以在重复数据删除之前更聪明地进行排序:data.orderBy("r").distinct.explain
== Physical Plan ==
*HashAggregate(keys=[r#227L], functions=[])
+- *HashAggregate(keys=[r#227L], functions=[])
+- *Sort [r#227L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(r#227L ASC NULLS FIRST, 200)
+- *Project [FLOOR((rand(-8636860894475783181) * 4.0)) AS r#227L]
+- *Range (0, 5, step=1, splits=2)
只剩下一个交换。事实上,spark 知道在按范围 shuffle 之后,重复的元素在同一个分区中。因此它不会触发新的洗牌。
关于scala - distinct() 是否对数据集进行排序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56441314/