我的数据集以这种方式分区:
Year=yyyy
|---Month=mm
| |---Day=dd
| | |---<parquet-files>
在两个日期之间加载数据的 spark 中创建数据帧的最简单有效的方法是什么?
最佳答案
如果你绝对要坚持这个分区策略,答案取决于你是否愿意承担分区发现成本。
如果您愿意让 Spark 发现所有分区,这只需发生一次(直到您添加新文件),您可以加载基本路径,然后使用分区列进行过滤。
如果您不希望 Spark 发现所有分区,例如,因为您有数百万个文件,那么唯一有效的通用解决方案是将您要查询的间隔分解为几个子间隔,您可以使用@r0bb23 的方法轻松查询然后联合然后在一起。
如果您想要上述两种情况的最佳选择并且您拥有稳定的架构,则可以通过定义外部分区表在 Metastore 中注册分区。如果您希望您的架构随着 Metastore 管理的表在此时非常糟糕地管理架构演变而发展,请不要这样做。
例如在2017-10-06
之间查询和 2017-11-03
你会这样做:
// With full discovery
spark.read.parquet("hdfs:///basepath")
.where('Year === 2017 && (
('Month === 10 && 'Day >= 6') || ('Month === 11 && 'Day <= 3')
))
// With partial discovery
val df1 = spark.read.option("basePath", "hdfs:///basepath/")
.parquet("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9], [1-3][0-9]}/*/")
val df2 = spark.read.option("basePath", "hdfs:///basepath/")
.parquet("hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")
val df = df1.union(df2)
为此编写通用代码当然是可能的,但我还没有遇到过。更好的方法是按照我对问题发表的评论中概述的方式进行分区。如果你的表是使用类似
/basepath/ts=yyyymmddhhmm/*.parquet
的东西分区的那么答案很简单:spark.read.parquet("hdfs:///basepath")
.where('ts >= 201710060000L && 'ts <= 201711030000L)
值得添加小时和分钟的原因是,您可以编写处理间隔的通用代码,无论您是否按周、天、小时或每 15 分钟对数据进行分区。事实上,您甚至可以在同一个表中管理不同粒度的数据,例如,在更高级别聚合较旧的数据以减少需要发现的分区总数。
关于apache-spark - 使用日期范围对分区数据进行 Spark SQL 查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47191078/