假设我有两个分区的数据帧:
df1 = spark.createDataFrame(
[(x,x,x) for x in range(5)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')
df2 = spark.createDataFrame(
[(x,x,x) for x in range(7)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')
(场景1)如果我通过 [key1, key2] 加入它们,则在每个分区内执行 join 操作而不进行shuffle(结果数据帧中的分区数相同):
x = df1.join(df2, on=['key1', 'key2'], how='left')
assert x.rdd.getNumPartitions() == 3
(场景2)但是如果我通过 [key1, key2, time] 将它们连接起来,则会发生 shuffle 操作(结果数据框中的分区数为 200,由 spark.sql.shuffle.partitions 选项驱动):
x = df1.join(df2, on=['key1', 'key2', 'time'], how='left')
assert x.rdd.getNumPartitions() == 200
同时通过[key1, key2, time]进行groupby和window操作,保留分区数,不shuffle完成:
x = df1.groupBy('key1', 'key2', 'time').agg(F.count('*'))
assert x.rdd.getNumPartitions() == 3
我不明白这是一个错误还是在第二种情况下执行 shuffle 操作有一些原因?如果可能的话,我怎样才能避免洗牌?
最佳答案
我想能够找出 Python 和 Scala 中不同结果的原因。
原因在于广播优化。如果 spark-shell 在禁用广播的情况下启动,Python 和 Scala 的工作方式相同。
./spark-shell --conf spark.sql.autoBroadcastJoinThreshold=-1
val df1 = Seq(
(1, 1, 1)
).toDF("key1", "key2", "time").repartition(3, col("key1"), col("key2"))
val df2 = Seq(
(1, 1, 1),
(2, 2, 2)
).toDF("key1", "key2", "time").repartition(3, col("key1"), col("key2"))
val x = df1.join(df2, usingColumns = Seq("key1", "key2", "time"))
x.rdd.getNumPartitions == 200
所以看起来 spark 2.4.0 无法像@user10938362 所建议的那样优化开箱即用的描述案例和所需的催化剂优化器扩展。
顺便提一句。以下是有关编写催化剂优化器扩展的信息 https://developer.ibm.com/code/2017/11/30/learn-extension-points-apache-spark-extend-spark-catalyst-optimizer/
关于apache-spark - 关于在 Spark 中加入数据帧的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55229290/