apache-spark - Spark : What is the difference between repartition and repartitionByRange?

标签 apache-spark pyspark apache-spark-sql

我浏览了这里的文档:https://spark.apache.org/docs/latest/api/python/pyspark.sql.html
它说:

  • 用于重新分区:生成的 DataFrame 是散列分区的。
  • 对于 repartitionByRange:生成的 DataFrame 是范围分区的。

  • 还有一个 previous question也提到了。但是,我仍然不明白它们究竟有何不同,以及在选择其中一个时会产生什么影响?
    更重要的是 , 如果 repartition 进行哈希分区,则提供列作为其参数有什么影响?

    最佳答案

    我认为最好通过一些实验来研究差异。
    测试数据帧
    对于这个实验,我使用了以下两个数据帧(我在 Scala 中展示了代码,但概念与 Python API 相同):

    // Dataframe with one column "value" containing the values ranging from 0 to 1000000
    val df = Seq(0 to 1000000: _*).toDF("value")
    
    // Dataframe with one column "value" containing 1000000 the number 0 in addition to the numbers 5000, 10000 and 100000
    val df2 = Seq((0 to 1000000).map(_ => 0) :+ 5000 :+ 10000 :+ 100000: _*).toDF("value")
    
    理论
  • repartition适用于 HashPartitioner当提供一列或多列且 RoundRobinPartitioner它在提供的分区数量上均匀分布数据。如果提供了一列(或更多),这些值将被散列并用于通过计算类似 partition = hash(columns) % numberOfPartitions 的内容来确定分区号。 .
  • repartitionByRange将根据列值的范围对数据进行分区。这通常用于连续(非离散)值,例如任何类型的数字。请注意,由于性能原因,此方法使用采样来估计范围。因此,输出可能不一致,因为采样可能返回不同的值。样本大小可以通过配置 spark.sql.execution.rangeExchange.sampleSizePerPartition 控制.

  • 还值得一提的是,对于这两种方法,如果没有 numPartitions给定,默认情况下它会将 Dataframe 数据分区为 spark.sql.shuffle.partitions在您的 Spark session 中配置,并且可以通过自适应查询执行合并(自 Spark 3.x 起可用)。
    测试设置
    基于给定的 Testdata 我总是应用相同的代码:
    val testDf = df
    // here I will insert the partition logic
        .withColumn("partition", spark_partition_id()) // applying SQL built-in function to determin actual partition
        .groupBy(col("partition"))
        .agg(
          count(col("value")).as("count"),
          min(col("value")).as("min_value"),
          max(col("value")).as("max_value"))
        .orderBy(col("partition"))
    
    testDf.show(false)
    
    检测结果
    df.repartition(4, col("value"))
    正如预期的那样,我们得到 4 个分区,因为 df 的值范围从 0 到 1000000,我们看到它们的散列值将导致分布良好的数据帧。
    +---------+------+---------+---------+
    |partition|count |min_value|max_value|
    +---------+------+---------+---------+
    |0        |249911|12       |1000000  |
    |1        |250076|6        |999994   |
    |2        |250334|2        |999999   |
    |3        |249680|0        |999998   |
    +---------+------+---------+---------+
    
    df.repartitionByRange(4, col("value"))
    同样在这种情况下,我们得到 4 个分区,但这次最小值和最大值清楚地显示了分区内的值范围。它几乎均匀分布,每个分区有 250000 个值。
    +---------+------+---------+---------+
    |partition|count |min_value|max_value|
    +---------+------+---------+---------+
    |0        |244803|0        |244802   |
    |1        |255376|244803   |500178   |
    |2        |249777|500179   |749955   |
    |3        |250045|749956   |1000000  |
    +---------+------+---------+---------+
    
    df2.repartition(4, col("value"))
    现在,我们正在使用另一个 Dataframe df2 .在这里,散列算法对只有 0、5000、10000 或 100000 的值进行散列。当然,值 0 的散列将始终相同,因此所有零最终都在同一个分区中(在这种情况下,分区 3 )。其他两个分区只包含一个值。
    +---------+-------+---------+---------+
    |partition|count  |min_value|max_value|
    +---------+-------+---------+---------+
    |0        |1      |100000   |100000   |
    |1        |1      |10000    |10000    |
    |2        |1      |5000     |5000     |
    |3        |1000001|0        |0        |
    +---------+-------+---------+---------+
    
    df2.repartition(4)
    不使用列“值”的内容repartition方法将在 RoundRobin 的基础上分发消息。所有分区的数据量几乎相同。
    +---------+------+---------+---------+
    |partition|count |min_value|max_value|
    +---------+------+---------+---------+
    |0        |250002|0        |5000     |
    |1        |250002|0        |10000    |
    |2        |249998|0        |100000   |
    |3        |250002|0        |0        |
    +---------+------+---------+---------+
    
    df2.repartitionByRange(4, col("value"))
    这个案例表明Dataframe df2由于几乎所有值都是 0,因此对于按范围重新分区没有明确定义。因此,我们甚至最终只有两个分区,而分区 0 包含所有零。
    +---------+-------+---------+---------+
    |partition|count  |min_value|max_value|
    +---------+-------+---------+---------+
    |0        |1000001|0        |0        |
    |1        |3      |5000     |100000   |
    +---------+-------+---------+---------+
    

    关于apache-spark - Spark : What is the difference between repartition and repartitionByRange?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65809909/

    相关文章:

    scala - 如何从 Spark SQL DataFrame 中的 MapType 列获取键和值

    java - 在同一 JVM 中检测到多个 SparkContext

    apache-spark - Apache Spark : SparkFiles. get(fileName.txt) - 无法从 SparkContext 检索文件内容

    apache-spark - spark中如何区分操作是转换还是 Action ?

    python - 合并 PysPark 中的重叠区间

    apache-spark - SparkSession.sql 和 Dataset.sqlContext.sql 有什么区别?

    apache-spark - Spark : cache RDD to be used in another job

    apache-spark - pyspark:稀疏向量到 scipy 稀疏矩阵

    apache-spark - 在读取 csv 时在 Spark-2.2.0 中使用双引号处理多行数据

    python - PySpark UDF 返回可变大小的元组