我需要join
基于一些共享键列的许多 DataFrames 在一起。对于键值 RDD,可以指定一个分区器,以便将具有相同键的数据点混洗到同一个执行器,从而提高连接效率(如果在 join
之前有混洗相关操作)。可以在 Spark DataFrames 或 DataSets 上做同样的事情吗?
最佳答案
您可以 repartition
加载后的 DataFrame,如果您知道您将多次加入它
val users = spark.read.load("/path/to/users").repartition('userId)
val joined1 = users.join(addresses, "userId")
joined1.show() // <-- 1st shuffle for repartition
val joined2 = users.join(salary, "userId")
joined2.show() // <-- skips shuffle for users since it's already been repartitioned
所以它会洗牌一次数据,然后在加入后续时间时重用洗牌文件。
但是,如果您知道将重复混洗某些键上的数据,那么最好的办法是将数据保存为分桶表。这会将已预先哈希分区的数据写出,因此当您读入表并加入它们时,您可以避免洗牌。你可以这样做:
// you need to pick a number of buckets that makes sense for your data
users.bucketBy(50, "userId").saveAsTable("users")
addresses.bucketBy(50, "userId").saveAsTable("addresses")
val users = spark.read.table("users")
val addresses = spark.read.table("addresses")
val joined = users.join(addresses, "userId")
joined.show() // <-- no shuffle since tables are co-partitioned
为了避免洗牌,表必须使用相同的分桶(例如,相同数量的桶并加入桶列)。
关于apache-spark - 用于高效加入 Spark 数据帧/数据集的分区数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48160627/