我知道存在几种保留父分区的转换(如果之前设置过 - 例如 mapValues
),还有一些转换不保留它(例如 map
)。
我使用Spark 2.2的Dataset API。我的问题是 - dropDuplicates 转换是否保留分区?想象一下这段代码:
case class Item(one: Int, two: Int, three: Int)
import session.implicits._
val ds = session.createDataset(List(Item(1,2,3), Item(1,2,3)))
val repart = ds.repartition('one, 'two).cache()
repart.dropDuplicates(List("one", "two")) // will be partitioning preserved?
最佳答案
通常,dropDuplicates
会进行随机播放(因此不会保留分区),但在您的特殊情况下,它不会进行额外的随机播放,因为您已经以合适的形式对数据集进行了分区优化器考虑到:
repart.dropDuplicates(List("one","two")).explain()
== Physical Plan ==
*HashAggregate(keys=[one#3, two#4, three#5], functions=[])
+- *HashAggregate(keys=[one#3, two#4, three#5], functions=[])
+- InMemoryTableScan [one#3, two#4, three#5]
+- InMemoryRelation [one#3, two#4, three#5], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- Exchange hashpartitioning(one#3, two#4, 200)
+- LocalTableScan [one#3, two#4, three#5]
此处要查找的关键字是:Exchange
但请考虑以下代码,您首先使用普通的 repartition()
重新分区数据集:
val repart = ds.repartition(200).cache()
repart.dropDuplicates(List("one","two")).explain()
这确实会触发额外的随机播放(现在您有 2 个 Exchange
步骤):
== Physical Plan ==
*HashAggregate(keys=[one#3, two#4], functions=[first(three#5, false)])
+- Exchange hashpartitioning(one#3, two#4, 200)
+- *HashAggregate(keys=[one#3, two#4], functions=[partial_first(three#5, false)])
+- InMemoryTableScan [one#3, two#4, three#5]
+- InMemoryRelation [one#3, two#4, three#5], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- Exchange RoundRobinPartitioning(200)
+- LocalTableScan [one#3, two#4, three#5]
注意:我在 Spark 2.1 中检查过,在 Spark 2.2 中可能有所不同,因为 Spark 2.2 中的优化器发生了变化(基于成本的优化器)
关于scala - Apache Spark - dataset.dropDuplicates() 是否保留分区?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48280008/