apache-spark - PySpark - 优化 Parquet 读取后的分区数量

标签 apache-spark pyspark partitioning parquet

在按 year 分区的 Parquet 数据湖中和month ,与 spark.default.parallelism设置为即 4 ,假设我想创建一个由 2017 年第 11~12 个月和 2018 年第 1~3 个月两个来源组成的 DataFrame AB .

df = spark.read.parquet(
    "A.parquet/_YEAR={2017}/_MONTH={11,12}",
    "A.parquet/_YEAR={2018}/_MONTH={1,2,3}",
    "B.parquet/_YEAR={2017}/_MONTH={11,12}",
    "B.parquet/_YEAR={2018}/_MONTH={1,2,3}",
)

如果我得到分区数,Spark 使用 spark.default.parallelism默认:

df.rdd.getNumPartitions()
Out[4]: 4

考虑到创建df后我需要执行joingroupBy每个时期的操作,并且数据或多或少均匀地分布在每个时期(每个时期大约 1000 万行):

问题

  • 重新分区会提高后续操作的性能吗?
  • 如果是这样,如果我有 10 个不同的期间(A 和 B 每年 5 个),我是否应该按期间数重新分区并明确引用要重新分区的列 ( df.repartition(10,'_MONTH','_YEAR') )?

最佳答案

Will a repartition improve the performance of my subsequent operations?

通常不会。抢占式重新分区数据的唯一原因是,当基于相同的条件将相同的数据集用于多个连接时,避免进一步洗牌

If so, if I have 10 different periods (5 per year in both A and B), should I repartition by the number of periods and explicitly reference the columns to repartition (df.repartition(10,'_MONTH','_YEAR'))?

让我们一步一步来:

  • should I repartition by the number of periods

    从业者不保证级别和分区之间的 1:1 关系,因此唯一要记住的是,您不能拥有比唯一键更多的非空分区,因此使用明显更大的值是没有意义的。

  • and explicitly reference the columns to repartition

    如果您重新分区,然后加入groupBy,对两个部分使用相同的列集是唯一明智的解决方案。

摘要

在加入之前重新分区在两种情况下有意义:

  • 如果有多个后续连接

    df_ = df.repartition(10, "foo", "bar")
    df_.join(df1, ["foo", "bar"])
    ...
    df_.join(df2, ["foo", "bar"])
    
  • 当所需的输出分区数量与spark.sql.shuffle.partitions不同时使用单连接(并且没有广播连接)

    spark.conf.get("spark.sql.shuffle.partitions")
    # 200
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    
    df1_ = df1.repartition(11, "foo", "bar")
    df2_ = df2.repartition(11, "foo", "bar")
    
    df1_.join(df2_, ["foo", "bar"]).rdd.getNumPartitions()
    # 11
    
    df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions()
    # 200
    

    这可能比以下更好:

    spark.conf.set("spark.sql.shuffle.partitions", 11)
    df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions()
    spark.conf.set("spark.sql.shuffle.partitions", 200)
    

关于apache-spark - PySpark - 优化 Parquet 读取后的分区数量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50696528/

相关文章:

python - PySpark 2.4.5 与 Python 3.8.3 不兼容,我该如何解决?

scala - Apache Spark 根据列的不同值计算列值

azure - 如何在 Azure Synapse 中使用 PySpark 将文件装载为文件对象

apache-spark - 使用 Apache-Spark,根据条件减少或折叠 RDD

log4j - 如何抑制在 EMR 上运行的 spark-sql 的 INFO 消息?

python-2.7 - 如何使用 PySpark 检查 Hive 表是否存在

apache-spark - Pyspark - 如何拆分具有 Datetime 类型结构值的列?

MySQL注释掉alter table分区语句

c# - 以编程方式更改 FAT32 卷序列号

Linux 磁盘分区和 Nginx