上下文
我有两个表,作为我的 spark 作业的一部分,我正在加入/联合分组,这在我每次运行作业时都会导致大量洗牌。我想通过一次存储联合分组数据来分摊所有作业的成本,并将已经联合分组的数据用作我的常规 Spark 运行的一部分以避免混洗。
为了尝试实现这一点,我在 HDFS 中以 Parquet 格式存储了一些数据。我正在使用 Parquet 重复字段来实现以下架构
(date, [aRecords], [bRecords])
其中[aRecords]表示aRecord数组。我还使用通常的 write.partitionBy($"date")
在 HDFS 上按日期对数据进行分区。
在这种情况下,aRecords 和 bRecords 似乎按日期有效地组合在一起。我可以执行如下操作:
case class CogroupedData(date: Date, aRecords: Array[Int], bRecords: Array[Int])
val cogroupedData = spark.read.parquet("path/to/data").as[CogroupedData]
//Dataset[(Date,Int)] where the Int in the two sides multiplied
val results = cogroupedData
.flatMap(el => el.aRecords.zip(el.bRecords).map(pair => (el.date, pair._1 * pair._2)))
并获得我通过对按日期键入的 aRecords 和 bRecords 的两个单独表使用等效 groupByKey 操作获得的结果。
两者之间的区别在于我避免了对已经联合分组的数据进行洗牌,联合分组的成本通过在 HDFS 上持久化来分摊。
问题
现在开始提问。我想从联合分组的数据集中派生出两个分组的数据集,这样我就可以使用标准的 Spark SQL 运算符(如联合分组、联接等)而不会产生困惑。这似乎是可能的,因为第一个代码示例有效,但是当我加入/groupByKey/cogroup 等时,Spark 仍然坚持对数据进行哈希/洗牌。
采用以下代码示例。我希望有一种方法可以在执行连接时运行下面的内容而不会导致随机播放。
val cogroupedData = spark.read.parquet("path/to/data").as[CogroupedData]
val aRecords = cogroupedData
.flatMap(cog => cog.aRecords.map(a => (cog.date,a)))
val bRecords = cogroupedData
.flatMap(cog => cog.bRecords.map(b => (cog.date,b)))
val joined = aRecords.join(bRecords,Seq("date"))
查看文献,如果 cogroupedData 有一个已知的分区器,那么后面的操作不应该导致洗牌,因为它们可以使用 RDD 已经分区的事实并保留分区器。
我认为我需要实现的是获得一个带有已知分区程序的 cogroupedData Dataset/rdd,而不会导致洗牌。
我已经尝试过的其他事情:
- Hive 元数据 - 适用于简单的连接,但仅优化初始连接而不是后续转换。 Hive 对协作组也完全没有帮助
有人有什么想法吗?
最佳答案
你在这里犯了两个错误。
如今(Spark 2.3)Spark 不使用分区信息进行分区修剪以外的查询优化。仅使用分桶。详情见Does Spark know the partitioning key of a DataFrame? .
结论:要有机会进行优化,您必须使用 Metastore 和分桶。
一般来说,Spark 无法优化“强类型”数据集上的操作。详情见Spark 2.0 Dataset vs DataFrame和 Why is predicate pushdown not used in typed Dataset API (vs untyped DataFrame API)?
正确的做法是:
使用分桶。
val n: Int someDF.write.bucketBy(n, "date").saveAsTable("df")
放弃功能性 API 以支持 SQL API:
import org.apache.spark.sql.functions.explode val df = spark.table("df") val adf = df.select($"date", explode($"aRecords").alias("aRecords")) val bdf = df.select($"date", explode($"bRecords").alias("bRecords")) adf.join(bdf, Seq("date"))
关于scala - 在 HDFS 上预组合表并在 Spark 中读取零混洗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52006635/