关于如何获取n RDD
和或DataFrame
的分区数,存在很多问题:答案总是:
rdd.getNumPartitions
或者
df.rdd.getNumPartitions
不幸的是,这是对
DataFrame
进行的昂贵操作,因为 df.rdd
需要从
DataFrame
转换为rdd
。这是运行时间的顺序 df.count
我在写逻辑,根据当前分区的数量是在可接受值的范围内还是在其之下或之上,根据需要选择
repartition
或coalesce
的DataFrame
。 def repartition(inDf: DataFrame, minPartitions: Option[Int],
maxPartitions: Option[Int]): DataFrame = {
val inputPartitions= inDf.rdd.getNumPartitions // EXPENSIVE!
val outDf = minPartitions.flatMap{ minp =>
if (inputPartitions < minp) {
info(s"Repartition the input from $inputPartitions to $minp partitions..")
Option(inDf.repartition(minp))
} else {
None
}
}.getOrElse( maxPartitions.map{ maxp =>
if (inputPartitions > maxp) {
info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
inDf.coalesce(maxp)
} else inDf
}.getOrElse(inDf))
outDf
}
但是我们不能以这种方式为每个
rdd.getNumPartitions
付出DataFrame
的费用。有没有办法获取此信息-例如从在线/临时
catalog
的registered
表中查询?更新,Spark GUI显示DataFrame.rdd操作所花费的时间与作业中最长的sql一样长。我将重新运行该作业,并在此处附加屏幕截图。
以下只是一个测试用例:它仅使用生产中数据量的一小部分。最长的
sql
只有五分钟-这也是要花费那段时间的方式(请注意,此处sql
并没有得到帮助:它还必须随后执行,因此有效地将累计执行时间加倍)。我们可以看到
.rdd
第30行的DataFrameUtils
操作(如上面的代码段所示)需要5.1分钟-而save
操作仍然需要5.2分钟之后-即。就后续.rdd
的执行时间而言,我们没有通过执行save
节省任何时间。
最佳答案
rdd
中没有rdd.getNumPartitions
组件的固有成本,因为从不评估返回的RDD
。
尽管您可以凭经验轻松地确定这一点,但可以使用调试器(我将其留给读者练习),或者确定在基本情况下没有触发任何作业
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]
scala> ds.rdd.getNumPartitions
res0: Int = 1
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true
仅仅说服您还不够。因此,让我们以更系统的方式进行处理:
rdd
返回一个MapPartitionRDD
(上面定义的ds
):scala> ds.rdd.getClass
res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD
RDD.getNumPartitions
invokes RDD.partitions
。 RDD.partitions
invokes getPartitions
中(也可以跟踪检查点路径)。 RDD.getPartitions
is abstract。 MapPartitionsRDD.getPartitions
,即delegates the call to the parent。 MapPartitionsRDD
和源之间只有rdd
。scala> ds.rdd.toDebugString
res3: String =
(1) MapPartitionsRDD[3] at rdd at <console>:26 []
| MapPartitionsRDD[2] at rdd at <console>:26 []
| MapPartitionsRDD[1] at rdd at <console>:26 []
| FileScanRDD[0] at rdd at <console>:26 []
类似地,如果
Dataset
包含交换,我们将跟随 parent 到最近的随机播放:scala> ds.orderBy("value").rdd.toDebugString
res4: String =
(67) MapPartitionsRDD[13] at rdd at <console>:26 []
| MapPartitionsRDD[12] at rdd at <console>:26 []
| MapPartitionsRDD[11] at rdd at <console>:26 []
| ShuffledRowRDD[10] at rdd at <console>:26 []
+-(1) MapPartitionsRDD[9] at rdd at <console>:26 []
| MapPartitionsRDD[5] at rdd at <console>:26 []
| FileScanRDD[4] at rdd at <console>:26 []
请注意,这种情况特别有趣,因为我们实际上触发了一个作业:
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
res5: Boolean = false
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
res6: Array[Int] = Array(0)
这是因为我们遇到了无法静态确定分区的情况(请参阅Number of dataframe partitions after sorting?和Why does sortBy transformation trigger a Spark job?)。
在这种情况下,
getNumPartitions
也将触发作业:scala> ds.orderBy("value").rdd.getNumPartitions
res7: Int = 67
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null) // Note new job id
res8: Array[Int] = Array(1, 0)
但是,这并不意味着所观察到的成本与
.rdd
调用相关。相反,如果没有静态公式(例如,某些Hadoop输入格式需要完全扫描数据),这是查找partitions
的固有成本。 请注意,此处提出的观点不应外推到
Dataset.rdd
的其他应用程序。例如,ds.rdd.count
确实是昂贵且浪费的。
关于scala - 如何在不产生.rdd成本的情况下检查Spark DataFrame的分区数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54268845/