在按 year
分区的 Parquet 数据湖中和month
,与 spark.default.parallelism
设置为即 4
,假设我想创建一个由 2017 年第 11~12 个月和 2018 年第 1~3 个月两个来源组成的 DataFrame A
和B
.
df = spark.read.parquet(
"A.parquet/_YEAR={2017}/_MONTH={11,12}",
"A.parquet/_YEAR={2018}/_MONTH={1,2,3}",
"B.parquet/_YEAR={2017}/_MONTH={11,12}",
"B.parquet/_YEAR={2018}/_MONTH={1,2,3}",
)
如果我得到分区数,Spark 使用 spark.default.parallelism
默认:
df.rdd.getNumPartitions()
Out[4]: 4
考虑到创建df
后我需要执行join
和groupBy
每个时期的操作,并且数据或多或少均匀地分布在每个时期(每个时期大约 1000 万行):
问题
- 重新分区会提高后续操作的性能吗?
- 如果是这样,如果我有 10 个不同的期间(A 和 B 每年 5 个),我是否应该按期间数重新分区并明确引用要重新分区的列 (
df.repartition(10,'_MONTH','_YEAR')
)?
最佳答案
Will a repartition improve the performance of my subsequent operations?
通常不会。抢占式重新分区数据的唯一原因是,当基于相同的条件将相同的数据集
用于多个连接时,避免进一步洗牌
If so, if I have 10 different periods (5 per year in both A and B), should I repartition by the number of periods and explicitly reference the columns to repartition (df.repartition(10,'_MONTH','_YEAR'))?
让我们一步一步来:
-
should I repartition by the number of periods
从业者不保证级别和分区之间的 1:1 关系,因此唯一要记住的是,您不能拥有比唯一键更多的非空分区,因此使用明显更大的值是没有意义的。
-
and explicitly reference the columns to repartition
如果您
重新分区
,然后加入
或groupBy
,对两个部分使用相同的列集是唯一明智的解决方案。
摘要
在加入之前重新分区
在两种情况下有意义:
如果有多个后续
连接
df_ = df.repartition(10, "foo", "bar") df_.join(df1, ["foo", "bar"]) ... df_.join(df2, ["foo", "bar"])
当所需的输出分区数量与
spark.sql.shuffle.partitions
不同时使用单连接(并且没有广播连接)spark.conf.get("spark.sql.shuffle.partitions") # 200 spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) df1_ = df1.repartition(11, "foo", "bar") df2_ = df2.repartition(11, "foo", "bar") df1_.join(df2_, ["foo", "bar"]).rdd.getNumPartitions() # 11 df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions() # 200
这可能比以下更好:
spark.conf.set("spark.sql.shuffle.partitions", 11) df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions() spark.conf.set("spark.sql.shuffle.partitions", 200)
关于apache-spark - PySpark - 优化 Parquet 读取后的分区数量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50696528/