scala - Apache Spark - dataset.dropDuplicates() 是否保留分区?

标签 scala apache-spark apache-spark-sql

我知道存在几种保留父分区的转换(如果之前设置过 - 例如 mapValues),还有一些转换不保留它(例如 map)。

我使用Spark 2.2的Dataset API。我的问题是 - dropDuplicates 转换是否保留分区?想象一下这段代码:

case class Item(one: Int, two: Int, three: Int)

import session.implicits._
val ds = session.createDataset(List(Item(1,2,3), Item(1,2,3)))

val repart = ds.repartition('one, 'two).cache()

repart.dropDuplicates(List("one", "two")) // will be partitioning preserved?

最佳答案

通常,dropDuplicates会进行随机播放(因此不会保留分区),但在您的特殊情况下,它不会进行额外的随机播放,因为您已经以合适的形式对数据集进行了分区优化器考虑到:

repart.dropDuplicates(List("one","two")).explain()

 == Physical Plan ==
*HashAggregate(keys=[one#3, two#4, three#5], functions=[])
+- *HashAggregate(keys=[one#3, two#4, three#5], functions=[])
   +- InMemoryTableScan [one#3, two#4, three#5]
         +- InMemoryRelation [one#3, two#4, three#5], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
               +- Exchange hashpartitioning(one#3, two#4, 200)
                  +- LocalTableScan [one#3, two#4, three#5]

此处要查找的关键字是:Exchange

但请考虑以下代码,您首先使用普通的 repartition() 重新分区数据集:

val repart = ds.repartition(200).cache()
repart.dropDuplicates(List("one","two")).explain()

这确实会触发额外的随机播放(现在您有 2 个 Exchange 步骤):

== Physical Plan ==
*HashAggregate(keys=[one#3, two#4], functions=[first(three#5, false)])
+- Exchange hashpartitioning(one#3, two#4, 200)
   +- *HashAggregate(keys=[one#3, two#4], functions=[partial_first(three#5, false)])
      +- InMemoryTableScan [one#3, two#4, three#5]
            +- InMemoryRelation [one#3, two#4, three#5], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- Exchange RoundRobinPartitioning(200)
                     +- LocalTableScan [one#3, two#4, three#5]

注意:我在 Spark 2.1 中检查过,在 Spark 2.2 中可能有所不同,因为 Spark 2.2 中的优化器发生了变化(基于成本的优化器)

关于scala - Apache Spark - dataset.dropDuplicates() 是否保留分区?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48280008/

相关文章:

scala - 使用 Dockerfile 安装 Docker

apache-spark - 如何使用 CrossValidator 获得精确度/召回率,以使用 Spark 训练 NaiveBayes 模型

apache-spark - 用于 Json 数据的 Spark

amazon-s3 - Databricks 中的显式表分区如何影响写入性能?

python - 如何在 PySpark 中一天内累积超过 '1 hour' 个窗口

Scala 括号 java.lang.StackOverflowError

scala - 在 Play 中以编程方式获取路线!框架 2.5.x

apache-spark - Spark/Databricks SQL 输出中的时间戳时区错误/缺失

java - 如何从行值创建新列

scala - RDD 拆分并在新 RDD 上进行聚合