scala - 在 HDFS 上预组合表并在 Spark 中读取零混洗

标签 scala apache-spark hadoop apache-spark-sql

上下文

我有两个表,作为我的 spark 作业的一部分,我正在加入/联合分组,这在我每次运行作业时都会导致大量洗牌。我想通过一次存储联合分组数据来分摊所有作业的成本,并将已经联合分组的数据用作我的常规 Spark 运行的一部分以避免混洗。

为了尝试实现这一点,我在 HDFS 中以 Parquet 格式存储了一些数据。我正在使用 Parquet 重复字段来实现以下架构

(date, [aRecords], [bRecords])

其中[aRecords]表示aRecord数组。我还使用通常的 write.partitionBy($"date") 在 HDFS 上按日期对数据进行分区。

在这种情况下,aRecords 和 bRecords 似乎按日期有效地组合在一起。我可以执行如下操作:

case class CogroupedData(date: Date, aRecords: Array[Int], bRecords: Array[Int])

val cogroupedData = spark.read.parquet("path/to/data").as[CogroupedData]

//Dataset[(Date,Int)] where the Int in the two sides multiplied
val results = cogroupedData
    .flatMap(el => el.aRecords.zip(el.bRecords).map(pair => (el.date, pair._1 * pair._2)))

并获得我通过对按日期键入的 aRecords 和 bRecords 的两个单独表使用等效 groupByKey 操作获得的结果。

两者之间的区别在于我避免了对已经联合分组的数据进行洗牌,联合分组的成本通过在 HDFS 上持久化来分摊。

问题

现在开始提问。我想从联合分组的数据集中派生出两个分组的数据集,这样我就可以使用标准的 Spark SQL 运算符(如联合分组、联接等)而不会产生困惑。这似乎是可能的,因为第一个代码示例有效,但是当我加入/groupByKey/cogroup 等时,Spark 仍然坚持对数据进行哈希/洗牌。

采用以下代码示例。我希望有一种方法可以在执行连接时运行下面的内容而不会导致随机播放。

val cogroupedData = spark.read.parquet("path/to/data").as[CogroupedData]

val aRecords = cogroupedData
    .flatMap(cog => cog.aRecords.map(a => (cog.date,a)))
val bRecords = cogroupedData
    .flatMap(cog => cog.bRecords.map(b => (cog.date,b)))

val joined = aRecords.join(bRecords,Seq("date"))

查看文献,如果 cogroupedData 有一个已知的分区器,那么后面的操作不应该导致洗牌,因为它们可以使用 RDD 已经分区的事实并保留分区器。

我认为我需要实现的是获得一个带有已知分区程序的 cogroupedData Dataset/rdd,而不会导致洗牌。

我已经尝试过的其他事情:

  • Hive 元数据 - 适用于简单的连接,但仅优化初始连接而不是后续转换。 Hive 对协作组也完全没有帮助

有人有什么想法吗?

最佳答案

你在这里犯了两个错误。

正确的做法是:

  • 使用分桶。

    val n: Int
    someDF.write.bucketBy(n, "date").saveAsTable("df")
    
  • 放弃功能性 API 以支持 SQL API:

    import org.apache.spark.sql.functions.explode
    
    val df = spark.table("df")
    
    val adf = df.select($"date", explode($"aRecords").alias("aRecords"))
    val bdf = df.select($"date", explode($"bRecords").alias("bRecords"))
    
    adf.join(bdf, Seq("date"))
    

关于scala - 在 HDFS 上预组合表并在 Spark 中读取零混洗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52006635/

相关文章:

java - 如何从 sbt 中发现定义类的库?

java - 线程 "main"org.apache.spark.sql.AnalysisException 中出现异常 : cannot resolve 'named_struct()' due to data type mismatch:

java - 在 Hadoop 中设置可写?

scala - 解决线程 “main”中的异常java.lang.NoClassDefFoundError:org/apache/hadoop/fs/FSDataInputStream

regex - Scala,正则表达式匹配忽略不必要的单词

scala - 大数据慢IO

apache-spark - 如何为 Spark Dataframe 创建自定义编写器?

json - 计算带有 Spark 条件的数据帧的行数

hadoop - 无法配置 Pig 将中间文件存储为 LZO 格式

scala - 我发现自己在大多数函数结束时反转累加器;我怎么能停下来?