apache-spark - 如何在spark中加入2个已经用同一列分区的数据帧而不进行洗牌..?

标签 apache-spark pyspark apache-spark-sql

我有 2 个 df

df1:

  • 列:col1、col2、col3
  • 在 col1 上分区
  • 分区数量:120000

df2:

  • 列:col1、col2、col3
  • 在 col1 上分区
  • 分区数量:80000

现在我想加入 df1、df2(df1.col1=df2.col1 和 df1.col2=df2.col2),无需太多的洗牌

尝试加入,但花了很多时间...

我该怎么做..有人可以帮忙吗..?

最佳答案

在我看来,如果您的数据集之一很小(比如说几百MB),您可以尝试使用广播连接 - 在这种情况下,较小的数据集将被广播,您将跳过随机播放

如果没有广播提示,催化剂可能会选择 SMJ(排序合并连接),并且在此连接算法期间,数据需要通过连接键重新分区,然后排序。我准备了一个简单的例子

import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", "10")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

val data = Seq(("test", 3),("test", 3), ("test2", 5), ("test3", 7), ("test55", 86))
val data2 = Seq(("test", 3),("test", 3), ("test2", 5), ("test3", 6), ("test33", 76))

val df = data.toDF("Name", "Value").repartition(5, col("Name"))
df.show
val df2 = data2.toDF("Name", "Value").repartition(5, col("Name"))
df2.show

df.join(df2, Seq("Name", "Value")).show

autoBroadcastJoinThreshold 设置为 -1 以禁用广播连接

sql.shuffle.partitions 设置为 10 以表明 join 将在重新分区期间使用该值

我在连接之前对 dfs 重新分区了 5 个分区,并调用了操作以确保它们在连接之前按同一列进行分区

在 sql 选项卡中我可以看到 Spark 再次重新分区数据

enter image description here

如果您无法广播并且您的加入花费了很多时间,您可以检查是否存在一些偏差。

您可以阅读此blogpost by Dima Statz查找有关连接倾斜的更多信息

关于apache-spark - 如何在spark中加入2个已经用同一列分区的数据帧而不进行洗牌..?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74389293/

相关文章:

apache-spark - 将自定义退出代码从yarn-cluster模式spark传递到CLI

apache-spark - 如何将数据从 Cassandra 导出到 BigQuery

python - 从整数列 PySpark 中删除字母

apache-spark - 将 Spark 数据帧转换为 Spark DenseMatrix 进行操作

Scala 线程池 - 同时调用 API

python - 为 pyspark 运行 nosetests

amazon-web-services - 使用 CLI 将逗号分隔的参数传递给 AWS EMR 中的 spark jar

python - 是否可以在docker下运行spark udf函数(主要是python)?

java - 如何从 Spark DataFrame 解析具有自定义 json 格式的列

Java Spark DataFrame 连接包含数组的列