问题
我们在 ADLS Gen2 之上有一个 Delta Lake 设置,其中包含下表:
bronze.DeviceData
: 按到达日期 ( Partition_Date
) silver.DeviceData
:按事件日期和时间分区( Partition_Date
和 Partition_Hour
)我们从事件中心将大量数据(每天超过 6 亿条记录)摄取到
bronze.DeviceData
(仅附加)。然后我们以流方式处理新文件并将它们更新到 silver.DeviceData
使用 delta MERGE 命令(见下文)。到达铜牌表的数据可以包含来自任何银牌分区的数据(例如,设备可以发送它在本地缓存的历史数据)。但是,任何一天到达的>90% 的数据来自分区
Partition_Date IN (CURRENT_DATE(), CURRENT_DATE() - INTERVAL 1 DAYS, CURRENT_DATE() + INTERVAL 1 DAYS)
.因此,为了更新数据,我们有以下两个 spark 作业:现在我们来解决这个问题:虽然在“慢”工作中数据量少了很多,但它运行数天只是为了处理一天的慢青铜数据,有一个大集群。原因很简单:它必须读取和更新许多银分区(有时> 1000 个日期分区),并且由于更新很小但日期分区可能是千兆字节,因此这些合并命令效率低下。
而且,随着时间的推移,这个缓慢的工作会变得越来越慢,因为它接触到的银色分区会增长。
问题
附加信息
CREATE TABLE silver.DeviceData (
DeviceID LONG NOT NULL, -- the ID of the device that sent the data
DataType STRING NOT NULL, -- the type of data it sent
Timestamp TIMESTAMP NOT NULL, -- the timestamp of the data point
Value DOUBLE NOT NULL, -- the value that the device sent
UpdatedTimestamp TIMESTAMP NOT NULL, -- the timestamp when the value arrived in bronze
Partition_Date DATE NOT NULL, -- = TO_DATE(Timestamp)
Partition_Hour INT NOT NULL -- = HOUR(Timestamp)
)
USING DELTA
PARTITIONED BY (Partition_Date, Partition_Hour)
LOCATION '...'
val silverTable = DeltaTable.forPath(spark, silverDeltaLakeDirectory)
val batch = ... // the streaming update batch
// the dates and hours that we want to upsert, for partition pruning
// collected from the streaming update batch
val dates = "..."
val hours = "..."
val mergeCondition = s"""
silver.Partition_Date IN ($dates)
AND silver.Partition_Hour IN ($hours)
AND silver.Partition_Date = batch.Partition_Date
AND silver.Partition_Hour = batch.Partition_Hour
AND silver.DeviceID = batch.DeviceID
AND silver.Timestamp = batch.Timestamp
AND silver.DataType = batch.DataType
"""
silverTable.alias("silver")
.merge(batch.alias("batch"), mergeCondition)
// only merge if the event is newer
.whenMatched("batch.UpdatedTimestamp > silver.UpdatedTimestamp").updateAll
.whenNotMatched.insertAll
.execute
最佳答案
在 Databricks 上,有多种方法可以优化 merge into
的性能。手术:
OPTIMIZE
创建需要重写的 1Gb 文件。您可以使用 spark.databricks.delta.optimize.maxFileSize
将文件大小设置为 32Mb-64Mb 范围,以便重写更少的数据 此 presentation from Spark Summit谈优化
merge into
- 要观察的指标等。我不是 100% 确定您需要条件
silver.Partition_Date IN ($dates) AND silver.Partition_Hour IN ($hours)
因为如果传入数据中没有特定分区,您可能会读取比所需更多的数据,但它需要查看执行计划。此 knowledge base article解释如何确保 merge into
使用分区修剪。
关于scala - 在大量分区上处理 upsert 不够快,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66659817/