apache-spark - 关于在 Spark 中加入数据帧的问题

标签 apache-spark pyspark apache-spark-sql pyspark-sql

假设我有两个分区的数据帧:

df1 = spark.createDataFrame(
    [(x,x,x) for x in range(5)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')

df2 = spark.createDataFrame(
    [(x,x,x) for x in range(7)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')

(场景1)如果我通过 [key1, key2] 加入它们,则在每个分区内执行 join 操作而不进行shuffle(结果数据帧中的分区数相同):
x = df1.join(df2, on=['key1', 'key2'], how='left')
assert x.rdd.getNumPartitions() == 3

(场景2)但是如果我通过 [key1, key2, time] 将它们连接起来,则会发生 shuffle 操作(结果数据框中的分区数为 200,由 spark.sql.shuffle.partitions 选项驱动):
x = df1.join(df2, on=['key1', 'key2', 'time'], how='left')
assert x.rdd.getNumPartitions() == 200

同时通过[key1, key2, time]进行groupby和window操作,保留分区数,不shuffle完成:
x = df1.groupBy('key1', 'key2', 'time').agg(F.count('*'))
assert x.rdd.getNumPartitions() == 3

我不明白这是一个错误还是在第二种情况下执行 shuffle 操作有一些原因?如果可能的话,我怎样才能避免洗牌?

最佳答案

我想能够找出 Python 和 Scala 中不同结果的原因。

原因在于广播优化。如果 spark-shell 在禁用广播的情况下启动,Python 和 Scala 的工作方式相同。

./spark-shell --conf spark.sql.autoBroadcastJoinThreshold=-1

val df1 = Seq(
  (1, 1, 1)
).toDF("key1", "key2", "time").repartition(3, col("key1"), col("key2"))

val df2 = Seq(
  (1, 1, 1),
  (2, 2, 2)
).toDF("key1", "key2", "time").repartition(3, col("key1"), col("key2"))

val x = df1.join(df2, usingColumns = Seq("key1", "key2", "time"))

x.rdd.getNumPartitions == 200

所以看起来 spark 2.4.0 无法像@user10938362 所建议的那样优化开箱即用的描述案例和所需的催化剂优化器扩展。

顺便提一句。以下是有关编写催化剂优化器扩展的信息 https://developer.ibm.com/code/2017/11/30/learn-extension-points-apache-spark-extend-spark-catalyst-optimizer/

关于apache-spark - 关于在 Spark 中加入数据帧的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55229290/

相关文章:

java - 使用 Spark 定义带有数字列表及其相关概率的随机变量

sql-server - 检查表是否存在 Spark jdbc

apache-spark - 使用 Spark 从日期列中获取周末日期

python - 检索数据框中唯一的列组合的任意行

apache-spark - Spark 结构流和批处理是否相同?

scala - 如何在 Spark 中找到分组数据的确切中位数

python - pyspark中聚合函数后如何保持列顺序一致

apache-spark - Spark RDD 缓存能走多远?

hadoop - 如何配置 pyspark 默认写入 HDFS?

python-3.x - 带时区的 Pyspark to_timestamp