我有 2 个 df
df1:
- 列:col1、col2、col3
- 在 col1 上分区
- 分区数量:120000
df2:
- 列:col1、col2、col3
- 在 col1 上分区
- 分区数量:80000
现在我想加入 df1、df2(df1.col1=df2.col1 和 df1.col2=df2.col2),无需太多的洗牌
尝试加入,但花了很多时间...
我该怎么做..有人可以帮忙吗..?
最佳答案
在我看来,如果您的数据集之一很小(比如说几百MB),您可以尝试使用广播连接 - 在这种情况下,较小的数据集将被广播,您将跳过随机播放
如果没有广播提示,催化剂可能会选择 SMJ(排序合并连接),并且在此连接算法期间,数据需要通过连接键重新分区,然后排序。我准备了一个简单的例子
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", "10")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val data = Seq(("test", 3),("test", 3), ("test2", 5), ("test3", 7), ("test55", 86))
val data2 = Seq(("test", 3),("test", 3), ("test2", 5), ("test3", 6), ("test33", 76))
val df = data.toDF("Name", "Value").repartition(5, col("Name"))
df.show
val df2 = data2.toDF("Name", "Value").repartition(5, col("Name"))
df2.show
df.join(df2, Seq("Name", "Value")).show
autoBroadcastJoinThreshold 设置为 -1 以禁用广播连接
sql.shuffle.partitions 设置为 10 以表明 join 将在重新分区期间使用该值
我在连接之前对 dfs 重新分区了 5 个分区,并调用了操作以确保它们在连接之前按同一列进行分区
在 sql 选项卡中我可以看到 Spark 再次重新分区数据
如果您无法广播并且您的加入花费了很多时间,您可以检查是否存在一些偏差。
您可以阅读此blogpost by Dima Statz查找有关连接倾斜的更多信息
关于apache-spark - 如何在spark中加入2个已经用同一列分区的数据帧而不进行洗牌..?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74389293/