apache-spark - 在 delta lake 中高效读取/转换分区数据

标签 apache-spark pyspark apache-spark-sql delta-lake

我在 ADLS 的三角洲湖中有我的数据,并通过 Databricks 读取它。数据按年和日期分区,z 按 storeIdNum 排序,其中大约有 10 个商店 ID #s,每个日期有几百万行。当我阅读它时,有时我正在读取一个日期分区(约 2000 万行),有时我正在读取整整一个月或一年的数据以进行批处理操作。我有第二个小得多的表,每个日期大约有 75,000 行,它也是 z 按 storeIdNum 排序的,我的大部分操作涉及将较大的数据表连接到 storeIdNum 上的较小表(以及一些其他字段 - 如时间窗口,较小的表是按小时汇总的,另一个表每秒都有数据点)。当我读入表格时,我加入它们并执行一系列操作(分组依据、窗口依据和分区依据以及滞后/领先/平均/密集排名函数等)。

我的问题是:我是否应该在所有连接、分组依据和分区依据语句中包含日期?每当我读取数据的一个日期时,我总是在其中包含年份和日期读取数据的语句,据我所知,我只想从某个分区(或一年的分区)读取数据,但引用分区 col 也很重要。在 Windows 和组总线中提高效率,或者这是多余的?在分析/转换之后,我不会覆盖/修改我正在读取的数据,而是写入一个新表(可能分区在相同的列上),以防这是一个因素。

例如:

dfBig = spark.sql("SELECT YEAR, DATE, STORE_ID_NUM, UNIX_TS, BARCODE, CUSTNUM, .... FROM STORE_DATA_SECONDS WHERE YEAR = 2020 and DATE='2020-11-12'")
dfSmall = spark.sql("SELECT YEAR, DATE, STORE_ID_NUM, TS_HR, CUSTNUM, .... FROM STORE_DATA_HRS WHERE YEAR = 2020 and DATE='2020-11-12'")

现在,如果我加入他们,我是想在加入中包含 YEAR 和 DATE,还是应该只加入 STORE_ID_NUM(以及我需要加入的任何时间戳字段/客户 ID 号字段)?我绝对需要 STORE_ID_NUM,但如果它只是添加另一列并使其效率更低,我可以放弃 YEAR AND DATE,因为它有更多的东西要加入。我不知道它到底是如何工作的,所以我想通过前面的连接来检查,也许我在做操作时没有使用分区,所以效率降低了?谢谢!

最佳答案

delta 的关键是要很好地选择分区列,这可能需要一些试验和错误,如果你想优化响应的性能,我学到的一个技巧是选择基数较低的过滤列(你知道问题是否是时间序列的,它将是日期,另一方面,如果它是关于所有客户的报告,在这种情况下选择您的城市可能会很方便),请记住,如果您使用增量每个分区表示文件结构的级别,其基数将是目录的数量。

在你的情况下,我发现按 YEAR 分区很好,但我会添加 MONTH,因为记录的数量对 spark 的动态修剪有所帮助

您可以尝试的另一件事是,如果表与另一个表相比非常小,则使用 BRADCAST JOIN。

Broadcast Hash Join en Spark (ES)

Join Strategy Hints for SQL Queries

后一个链接解释了动态修剪如何帮助 MERGE 操作。

How to improve performance of Delta Lake MERGE INTO queries using partition pruning

关于apache-spark - 在 delta lake 中高效读取/转换分区数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64826581/

相关文章:

apache-spark - SPARK : one powerful machine Vs. 几台较小的机器

json - 保存DataFrame时如何避免生成crc文件和SUCCESS文件?

python - Pyspark - 计算每个数据框列中的空值数

apache-spark - 如何在 macOS Mojave 上使用 Pandas UDF? (由于 [__NSPlaceholderDictionary initialize] 可能正在进行中而失败...)

python - 无法在 Spark 上运行 TensorFlow

scala - 数据未加载到 Scala 中的表中

apache-spark - SparkConf作为Kubernetes ConfigMap

scala - map 功能中的条件

python - pyspark错误does not exist in jvm error when initializing SparkContext

scala - Spark 中的案例陈述