scala - 如何在不产生.rdd成本的情况下检查Spark DataFrame的分区数

标签 scala apache-spark partition

关于如何获取n RDD和或DataFrame的分区数,存在很多问题:答案总是:

 rdd.getNumPartitions

或者
 df.rdd.getNumPartitions

不幸的是,这是对DataFrame进行的昂贵操作,因为
 df.rdd

需要从DataFrame转换为rdd。这是运行时间的顺序
 df.count

我在写逻辑,根据当前分区的数量是在可接受值的范围内还是在其之下或之上,根据需要选择repartitioncoalesceDataFrame
  def repartition(inDf: DataFrame, minPartitions: Option[Int],
       maxPartitions: Option[Int]): DataFrame = {
    val inputPartitions= inDf.rdd.getNumPartitions  // EXPENSIVE!
    val outDf = minPartitions.flatMap{ minp =>
      if (inputPartitions < minp) {
        info(s"Repartition the input from $inputPartitions to $minp partitions..")
        Option(inDf.repartition(minp))
      } else {
        None
      }
    }.getOrElse( maxPartitions.map{ maxp =>
      if (inputPartitions > maxp) {
        info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
        inDf.coalesce(maxp)
      } else inDf
    }.getOrElse(inDf))
    outDf
  }

但是我们不能以这种方式为每个rdd.getNumPartitions付出DataFrame的费用。

有没有办法获取此信息-例如从在线/临时catalogregistered表中查询?

更新,Spark GUI显示DataFrame.rdd操作所花费的时间与作业中最长的sql一样长。我将重新运行该作业,并在此处附加屏幕截图。

以下只是一个测试用例:它仅使用生产中数据量的一小部分。最长的sql只有五分钟-这也是要花费那段时间的方式(请注意,此处sql并没有得到帮助:它还必须随后执行,因此有效地将累计执行时间加倍)。

enter image description here

我们可以看到.rdd第30行的DataFrameUtils操作(如上面的代码段所示)需要5.1分钟-而save操作仍然需要5.2分钟之后-即。就后续.rdd的执行时间而言,我们没有通过执行save节省任何时间。

最佳答案

rdd中没有rdd.getNumPartitions组件的固有成本,因为从不评估返回的RDD

尽管您可以凭经验轻松地确定这一点,但可以使用调试器(我将其留给读者练习),或者确定在基本情况下没有触发任何作业

Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]

scala> ds.rdd.getNumPartitions
res0: Int = 1

scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true

仅仅说服您还不够。因此,让我们以更系统的方式进行处理:
  • rdd返回一个MapPartitionRDD(上面定义的ds):
    scala> ds.rdd.getClass
    res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD
    
  • RDD.getNumPartitions invokes RDD.partitions
  • 在非检查点方案 RDD.partitions invokes getPartitions 中(也可以跟踪检查点路径)。
  • RDD.getPartitions is abstract
  • 因此,在这种情况下使用的实际实现是MapPartitionsRDD.getPartitions,即delegates the call to the parent
  • MapPartitionsRDD和源之间只有rdd
    scala> ds.rdd.toDebugString
    res3: String =
    (1) MapPartitionsRDD[3] at rdd at <console>:26 []
     |  MapPartitionsRDD[2] at rdd at <console>:26 []
     |  MapPartitionsRDD[1] at rdd at <console>:26 []
     |  FileScanRDD[0] at rdd at <console>:26 []
    

    类似地,如果Dataset包含交换,我们将跟随 parent 到最近的随机播放:
    scala> ds.orderBy("value").rdd.toDebugString
    res4: String =
    (67) MapPartitionsRDD[13] at rdd at <console>:26 []
     |   MapPartitionsRDD[12] at rdd at <console>:26 []
     |   MapPartitionsRDD[11] at rdd at <console>:26 []
     |   ShuffledRowRDD[10] at rdd at <console>:26 []
     +-(1) MapPartitionsRDD[9] at rdd at <console>:26 []
        |  MapPartitionsRDD[5] at rdd at <console>:26 []
        |  FileScanRDD[4] at rdd at <console>:26 []
    

    请注意,这种情况特别有趣,因为我们实际上触发了一个作业:
    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
    res5: Boolean = false
    
    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
    res6: Array[Int] = Array(0)
    

    这是因为我们遇到了无法静态确定分区的情况(请参阅Number of dataframe partitions after sorting?Why does sortBy transformation trigger a Spark job?)。

    在这种情况下,getNumPartitions也将触发作业:
    scala> ds.orderBy("value").rdd.getNumPartitions
    res7: Int = 67
    
    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)  // Note new job id
    res8: Array[Int] = Array(1, 0)
    

    但是,这并不意味着所观察到的成本与.rdd调用相关。相反,如果没有静态公式(例如,某些Hadoop输入格式需要完全扫描数据),这是查找partitions的固有成本。

  • 请注意,此处提出的观点不应外推到Dataset.rdd的其他应用程序。例如,ds.rdd.count确实是昂贵且浪费的。

    关于scala - 如何在不产生.rdd成本的情况下检查Spark DataFrame的分区数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54268845/

    相关文章:

    debugging - 如何: debug Scala code when outside of an IDE

    scala - 有没有办法在 Scala 食谱 Actor 通信示例中避免无类型的 ActorRef?

    scala - 在 Spark/Scala 中将 Array[Row] 转换为 DataFrame

    mysql - 如何从分区中选择,同时在 MySQL 中连接它的名称

    mysql - 如何在 SQL 百分位窗口函数上添加条件?

    javascript - 基于 'contain'的数组分区

    java - 在调用函数之前更新按钮点击时的标签

    java - 在 IntelliJ 中使用 Java 8 构建 Scala(2.7.0) maven 项目时出现错误 "error while loading Consumer"

    scala - 从检查点重新启动后 Spark Streaming 选项卡消失

    scala - 如何根据通配符/正则表达式条件在 Spark 中连接 2 个数据帧?