我想使用 MERGE INTO
命令将数据从 Databricks 上的增量表加载到 Snowflake 上的表中。
目标是 Databricks 上的增量表中的记录数看起来与 Snowflake 上的表中的记录数相同。
出现的问题是,由于 Delta Lake(S3 路径)有多个版本,Snowflake 查询了重复的记录。
如何才能只读取最新版本的delta lake?
MERGE INTO myTable as target USING (
SELECT
$1:DAY::TEXT AS DAY,
$1:CHANNEL_CATEGORY::TEXT AS CHANNEL_CATEGORY,
$1:SOURCE::TEXT AS SOURCE,
$1:PLATFORM::TEXT AS PLATFROM,
$1:LOB::TEXT AS LOB
FROM @StageFilePathDeltaLake
(FILE_FORMAT => 'sf_parquet_format')
) as src
ON target.CHANNEL_CATEGORY = src.CHANNEL_CATEGORY
AND target.SOURCE = src.SOURCE
WHEN MATCHED THEN
UPDATE SET
DAY= src.DAY
,PLATFORM= src.PLATFORM
,LOB= src.LOB
WHEN NOT MATCHED THEN
INSERT (
DAY,
CHANNEL_CATEGORY,
SOURCE,
PLATFORM,
LOB
) VALUES (
src.DAY,
src.CHANNEL_CATEGORY,
src.SOURCE,
src.PLATFORM,
src.LOB
);
sf_parque_format 是用这些细节创建的:
create or replace file format sf_parquet_format
type = 'parquet'
compression = auto;
最佳答案
上面的问题是 Snowflake 将其读取为 Parquet 文件,而不是 Delta。
解决方案很简单,明确其 Delta 性质:使用上述阶段和 table_format = delta
创建外部表并从外部表查询而不是直接从阶段查询。
create external table …
location=@mystage/daily/
refresh_on_create = false
auto_refresh = false
file_format = (type = parquet)
table_format = delta; -- this one
作为我队友的补充说明:如果您想查看最新数据(如果 Delta Lake 已更新,则在创建外部表之后),您将需要执行 ALTER EXTERNAL TABLE {name} REFRESH
。您还可以在外部表的顶部创建一个仅插入流,但只会在发生刷新后填充。
关于snowflake-cloud-data-platform - 从 Snowflake 查询 delta lake 以读取最新版本,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73381073/