我有两种情况,其中我有 23 GB
分区的 parquet
数据并读取一些列
& 缓存
预先触发一系列后续查询。
设置:
- 集群:12 节点 EMR
- Spark 版本:1.6
- Spark 配置:默认
- 运行配置:两种情况相同
案例 1:
val paths = Array("s3://my/parquet/path", ...)
val parqFile = sqlContext.read.parquet(paths:_*)
parqFile.registerTempTable("productViewBase")
val dfMain = sqlContext.sql("select guid,email,eventKey,timestamp,pogId from productViewBase")
dfMain.cache.count
从 SparkUI
中,读取的输入数据为 6.2 GB,缓存对象为 15.1 GB。
案例 1:
val paths = Array("s3://my/parquet/path", ...)
val parqFile = sqlContext.read.parquet(paths:_*)
parqFile.registerTempTable("productViewBase")
val dfMain = sqlContext.sql("select guid,email,eventKey,timestamp,pogId from productViewBase order by pogId")
dfMain.cache.count
从 SparkUI
中,读取的输入数据为 6.2 GB,缓存对象为 5.5 GB。
对此行为有任何解释或代码引用吗?
最佳答案
其实比较简单。正如您在 SQL 指南中所读:
Spark SQL can cache tables using an in-memory columnar format ... Spark SQL will scan only required columns and will automatically tune compression
排序列存储的好处是它很容易压缩典型数据。当你排序时,你会得到这些相似记录的 block ,这些 block 可以使用非常简单的技术(如 RLE)压缩在一起。 .
这是一个实际上在具有列式存储的数据库中经常使用的属性,因为它不仅在存储方面非常高效,而且在聚合方面也非常高效。
sql.execution.columnar.compression
涵盖了 Spark 柱状压缩的不同方面。包装,如您所见RunLengthEncoding
确实是可用的压缩方案之一。
所以这里有两 block :
Spark 可以调整压缩 method on the fly based on the statistics :
Spark SQL will automatically select a compression codec for each column based on statistics of the data.
排序可以将相似的记录聚集在一起,从而使压缩更加高效。
如果列之间存在一些相关性(如果不是这种情况?),即使是基于单个列的简单排序也会产生相对较大的影响并提高不同压缩方案的性能。
关于sql - Spark SQL : Cache Memory footprint improves with 'order by' ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36237508/