apache-spark - 如何在 Spark SQL 中强制进行内存中的分块排序?

标签 apache-spark

Parquet 文件格式对记录的顺序很敏感。根据排序顺序,其柱状编码可能会产生明显更小的文件。 另一方面,对 TB 的输入记录进行排序非常昂贵。

假设 10GB 分成 block 允许内存中排序,同时生成几乎与整个 1TB 完全排序一样小的 Parquet 文件。

是否可以指示 Spark SQL 在生成 parquet 文件之前进行分块排序?

另一个用例是在编写统一的 Parquet 文件之前使用分块排序将许多小的 Parquet 文件合并为一个文件。

最佳答案

据我所知,Spark < 2.0.0 中没有这样的开箱即用选项。您可以尝试的一件事是在编写之前将 coalesce 与 Hive SORT BY 子句结合起来,这应该具有类似的效果:

val df: DataFrame = ???
val n: Int = ??? //

df.coalesce(n)
df.coalesce(n).registerTempTable("df")
sqlContext.sql("SELECT * FROM df SORT BY foo, bar").write.parquet(...)

df.coalesce(n).sortWithinPartitions($"foo", $"bar").write.parquet(...)

请记住,SORT BY 不等同于 DataFrame.sort

Spark 2.0.0 引入了 sortBybucketBy 方法,其中后者按给定的列对每个桶中的输出进行排序should support Parquet :

val df: DataFrame = ???
val nBuckets: Int = ???

df.write.bucketBy(nBuckets, "foo").sortBy("foo", "bar").saveAsTable(...)

注意:这似乎仅在使用 saveAsTable 保存 Parquet 文件时有效,但它看起来并不直接支持 parquet writer (df.write. spark-2.0.0-preview 中的 bucketBy(...).sortBy(...).parquet(...))。

关于apache-spark - 如何在 Spark SQL 中强制进行内存中的分块排序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37640749/

相关文章:

scala - 如何在不使用 Scala 案例类的情况下为 CSV 文件指定架构?

scala - Spark : Explode a dataframe array of structs and append id

apache-spark - 在 PySpark 的文字列上检测到 INNER 连接的笛卡尔积

apache-spark - 在spark中获取嵌套的json对象

scala - DataFrame.count()== 0与DataFrame.rdd.isEmpty():哪个更好,为什么?

java - 如何使用 Spark 和 JavaRDD 检索特定行?

scala - 如何在 Apache ignite 中缓存 Dataframe

scala - Spark/Scala 中 array.map 和 rdd.map 有什么区别?

hadoop - Hadoop 2:使用自定义InputFormat时,结果为空

jdbc - 写入 JDBC 表的 SparkSQL SQL 查询是什么?