apache-spark - 用于高效加入 Spark 数据帧/数据集的分区数据

标签 apache-spark apache-spark-sql spark-dataframe partitioning apache-spark-dataset

我需要join基于一些共享键列的许多 DataFrames 在一起。对于键值 RDD,可以指定一个分区器,以便将具有相同键的数据点混洗到同一个执行器,从而提高连接效率(如果在 join 之前有混洗相关操作)。可以在 Spark DataFrames 或 DataSets 上做同样的事情吗?

最佳答案

您可以 repartition加载后的 DataFrame,如果您知道您将多次加入它

val users = spark.read.load("/path/to/users").repartition('userId)

val joined1 = users.join(addresses, "userId")
joined1.show() // <-- 1st shuffle for repartition

val joined2 = users.join(salary, "userId")
joined2.show() // <-- skips shuffle for users since it's already been repartitioned

所以它会洗牌一次数据,然后在加入后续时间时重用洗牌文件。

但是,如果您知道将重复混洗某些键上的数据,那么最好的办法是将数据保存为分桶表。这会将已预先哈希分区的数据写出,因此当您读入表并加入它们时,您可以避免洗牌。你可以这样做:
// you need to pick a number of buckets that makes sense for your data
users.bucketBy(50, "userId").saveAsTable("users")
addresses.bucketBy(50, "userId").saveAsTable("addresses")

val users = spark.read.table("users")
val addresses = spark.read.table("addresses")

val joined = users.join(addresses, "userId")
joined.show() // <-- no shuffle since tables are co-partitioned

为了避免洗牌,表必须使用相同的分桶(例如,相同数量的桶并加入桶列)。

关于apache-spark - 用于高效加入 Spark 数据帧/数据集的分区数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48160627/

相关文章:

apache-spark - 有没有办法在Java Spark 2.1中进行广播联接

java - 有什么办法可以扁平化 Spark Streaming 中的嵌套 JSON 吗?

apache-spark - 属性错误 : 'NoneType' object has no attribute 'write in Pyspark

scala - 如何从Spark中的多个Elasticsearch索引读取?

scala - 从 SFTP 服务器加载文件到 spark RDD

json - 从Spark Scala中的序列文件中提取JSON记录

python - 了解 Spark 执行中的 DAG

python - Spark join 抛出 'function' object has no attribute '_get_object_id' 错误。我该如何解决?

cassandra - 如何在 shell 中加载 Spark Cassandra Connector?

scala - 如何确保我的DataFrame释放其内存?