sql - DBMS 级别的管道和过滤器 : Splitting the MERGE output stream

标签 sql sql-server tsql azure-sql-database sql-merge

设想

我们有一个非常标准的数据导入过程,我们在其中加载staging表,然后 MERGE它变成了 target table 。

新要求(绿色)涉及捕获导入数据的子集
成一个单独的 queue表进行完全不相关的处理。

Scenario schema

“挑战”

(1) 子集由一组记录组成:那些
新插入target仅表。

(2) 子集是一些插入列的投影,但也
至少一列仅存在于源中(staging table )。

(3) MERGE语句已经使用了 OUTPUT..INTO条款
严格记录$action s 由 MERGE 拍摄,这样我们就可以PIVOT结果和COUNT插入次数、更新次数和
用于统计目的的删除。我们真的不喜欢缓冲
像这样对整个数据集进行操作,并且更喜欢聚合
即时的总和。不用说,我们不想添加更多数据到
OUTPUT table 。

(4) 我们不想做 MERGE 的匹配工作
无论出于何种原因,即使是部分原因,第二次执行。这target表真的很大,我们不能索引所有的东西,而且
操作通常非常昂贵(几分钟,而不是几秒钟)。

(5) 我们不考虑对 MERGE 的任何输出进行往返。到
客户端只是为了让客户端可以将其路由到 queue经过
立即寄回。数据必须留在服务器上。

(6) 我们希望避免在临时存储中缓冲整个数据集
之间stagingqueue .

最好的方法是什么?

失败

(a) 仅将插入的记录入队的要求阻止了我们
从定位到 queue表直接在 OUTPUT..INTO的条款MERGE ,因为它不允许任何 WHERE条款。我们可以使用一些CASE标记不需要的记录以供后续删除的技巧
来自 queue没有处理,但这似乎很疯狂。

(b) 因为有些列是为 queue 准备的不要出现在target表,我们不能简单地在目标上添加插入触发器
表加载 queue . “数据流拆分”必须更快发生。

(c) 因为我们已经使用了 OUTPUT..INTO MERGE中的条款, 我们
无法添加第二个 OUTPUT子句和嵌套 MERGEINSERT..SELECT加载队列。这是一种耻辱,因为它
感觉像是对有用的东西的完全任意限制
否则很好; SELECT仅过滤具有$action我们想要 ( INSERT ) 和 INSERT他们在 queue在一个
陈述。因此,DBMS 理论上可以避免缓冲整个
数据集并简单地将其流式传输到 queue . (注:我们没有追求
并且很可能它实际上并没有以这种方式优化计划。)

情况

我们觉得我们已经用尽了我们的选择,但决定转向 hive 思维
为了确定。我们能想到的只有:

(S1) 创建 VIEWtarget还包含可为空的表
用于 queue 的数据的列只有,并拥有SELECT语句将它们定义为 NULL .然后,设置 INSTEAD OF填充 target 的触发器表和queue适本地。最后,连接 MERGE以定位 View 。这个
有效,但我们不是这个结构的粉丝——它绝对是
看起来很棘手。

(S2) 放弃,使用临时表缓冲整个数据集
另一个 MERGE..OUTPUT .后 MERGE , 立即复制数据
(再次!)从临时表到 queue .

最佳答案

我的理解是,主要障碍是 OUTPUT 的限制。 SQL Server 中的子句。它允许一个 OUTPUT INTO table和/或一个 OUTPUT将结果集返回给调用者。

你想保存MERGE的结果以两种不同的方式声明:

  • MERGE 影响的所有行用于收集统计信息
  • 仅插入行 queue


  • 简单变体

    我会使用你的 S2 解决方案。至少开始。它易于理解和维护,并且应该非常高效,因为最耗费资源的操作(MERGETarget 本身只会执行一次)。下面有第二个变体,比较它们在真实数据上的表现会很有趣。

    所以:
  • 使用 OUTPUT INTO @TempTableMERGE
  • 要么INSERT来自 @TempTable 的所有行进入 Stats或在插入前聚合。如果您只需要汇总统计数据,则可以汇总该批次的结果并将其合并到最终的 Stats 中。而不是复制所有行。
  • INSERT进入 Queue只有来自 @TempTable 的“插入”行.

  • 我将从@i-one 的答案中获取样本数据。

    架构
    -- I'll return to commented lines later
    
    CREATE TABLE [dbo].[TestTarget](
        -- [ID] [int] IDENTITY(1,1) NOT NULL,
        [foo] [varchar](10) NULL,
        [bar] [varchar](10) NULL
    );
    
    CREATE TABLE [dbo].[TestStaging](
        [foo] [varchar](10) NULL,
        [bar] [varchar](10) NULL,
        [baz] [varchar](10) NULL
    );
    
    CREATE TABLE [dbo].[TestStats](
        [MergeAction] [nvarchar](10) NOT NULL
    );
    
    CREATE TABLE [dbo].[TestQueue](
        -- [TargetID] [int] NOT NULL,
        [foo] [varchar](10) NULL,
        [baz] [varchar](10) NULL
    );
    

    样本数据
    TRUNCATE TABLE [dbo].[TestTarget];
    TRUNCATE TABLE [dbo].[TestStaging];
    TRUNCATE TABLE [dbo].[TestStats];
    TRUNCATE TABLE [dbo].[TestQueue];
    
    INSERT INTO [dbo].[TestStaging]
        ([foo]
        ,[bar]
        ,[baz])
    VALUES
        ('A', 'AA', 'AAA'),
        ('B', 'BB', 'BBB'),
        ('C', 'CC', 'CCC');
    
    INSERT INTO [dbo].[TestTarget]
        ([foo]
        ,[bar])
    VALUES
        ('A', 'A_'),
        ('B', 'B?');
    

    合并
    DECLARE @TempTable TABLE (
        MergeAction nvarchar(10) NOT NULL,
        foo varchar(10) NULL,
        baz varchar(10) NULL);
    
    MERGE INTO TestTarget AS Dst
    USING TestStaging AS Src
    ON Dst.foo = Src.foo
    WHEN MATCHED THEN
    UPDATE SET
        Dst.bar = Src.bar
    WHEN NOT MATCHED BY TARGET THEN
    INSERT (foo, bar)
    VALUES (Src.foo, Src.bar)
    OUTPUT $action AS MergeAction, inserted.foo, Src.baz
    INTO @TempTable(MergeAction, foo, baz)
    ;
    
    INSERT INTO [dbo].[TestStats] (MergeAction)
    SELECT T.MergeAction
    FROM @TempTable AS T;
    
    INSERT INTO [dbo].[TestQueue]
        ([foo]
        ,[baz])
    SELECT
        T.foo
        ,T.baz
    FROM @TempTable AS T
    WHERE T.MergeAction = 'INSERT'
    ;
    
    SELECT * FROM [dbo].[TestTarget];
    SELECT * FROM [dbo].[TestStats];
    SELECT * FROM [dbo].[TestQueue];
    

    结果
    TestTarget
    +-----+-----+
    | foo | bar |
    +-----+-----+
    | A   | AA  |
    | B   | BB  |
    | C   | CC  |
    +-----+-----+
    
    TestStats
    +-------------+
    | MergeAction |
    +-------------+
    | INSERT      |
    | UPDATE      |
    | UPDATE      |
    +-------------+
    
    TestQueue
    +-----+-----+
    | foo | baz |
    +-----+-----+
    | C   | CCC |
    +-----+-----+
    

    第二种变体

    在 SQL Server 2014 Express 上测试。
    OUTPUT子句可以将其结果集发送到表和调用者。所以,OUTPUT INTO可以进Stats如果我们直接包装 MERGE语句转换成存储过程,然后我们可以使用INSERT ... EXECQueue .

    如果您检查执行计划,您会看到 INSERT ... EXEC无论如何都会在幕后创建一个临时表(另见 The Hidden Costs of INSERT EXEC by
    Adam Machanic),所以我希望当您显式创建临时表时,整体性能将类似于第一个变体。

    还有一个问题要解决:Queue表应该只有“插入”行,而不是所有受影响的行。要实现这一点,您可以在 Queue 上使用触发器。表以丢弃“插入”以外的行。另一种可能性是使用 IGNORE_DUP_KEY = ON 定义唯一索引。并以这样一种方式准备数据,即“非插入”行将违反唯一索引并且不会插入到表中。

    所以,我将添加一个 ID IDENTITY列到 Target表,我会添加一个 TargetID列到 Queue table 。 (在上面的脚本中取消注释它们)。
    另外,我将为 Queue 添加一个索引。 table :
    CREATE UNIQUE NONCLUSTERED INDEX [IX_TargetID] ON [dbo].[TestQueue]
    (
        [TargetID] ASC
    ) WITH (
    PAD_INDEX = OFF, 
    STATISTICS_NORECOMPUTE = OFF, 
    SORT_IN_TEMPDB = OFF, 
    IGNORE_DUP_KEY = ON, 
    DROP_EXISTING = OFF, 
    ONLINE = OFF, 
    ALLOW_ROW_LOCKS = ON, 
    ALLOW_PAGE_LOCKS = ON)
    

    重要的部分是UNIQUEIGNORE_DUP_KEY = ON .

    这是 MERGE 的存储过程:
    CREATE PROCEDURE [dbo].[TestMerge]
    AS
    BEGIN
        SET NOCOUNT ON;
        SET XACT_ABORT ON;
    
        MERGE INTO dbo.TestTarget AS Dst
        USING dbo.TestStaging AS Src
        ON Dst.foo = Src.foo
        WHEN MATCHED THEN
        UPDATE SET
            Dst.bar = Src.bar
        WHEN NOT MATCHED BY TARGET THEN
        INSERT (foo, bar)
        VALUES (Src.foo, Src.bar)
        OUTPUT $action INTO dbo.TestStats(MergeAction)
        OUTPUT CASE WHEN $action = 'INSERT' THEN inserted.ID ELSE 0 END AS TargetID, 
        inserted.foo,
        Src.baz
        ;
    
    END
    

    用法
    TRUNCATE TABLE [dbo].[TestTarget];
    TRUNCATE TABLE [dbo].[TestStaging];
    TRUNCATE TABLE [dbo].[TestStats];
    TRUNCATE TABLE [dbo].[TestQueue];
    
    -- Make sure that `Queue` has one special row with TargetID=0 in advance.
    INSERT INTO [dbo].[TestQueue]
        ([TargetID]
        ,[foo]
        ,[baz])
    VALUES
        (0
        ,NULL
        ,NULL);
    
    INSERT INTO [dbo].[TestStaging]
        ([foo]
        ,[bar]
        ,[baz])
    VALUES
        ('A', 'AA', 'AAA'),
        ('B', 'BB', 'BBB'),
        ('C', 'CC', 'CCC');
    
    INSERT INTO [dbo].[TestTarget]
        ([foo]
        ,[bar])
    VALUES
        ('A', 'A_'),
        ('B', 'B?');
    
    INSERT INTO [dbo].[TestQueue]
    EXEC [dbo].[TestMerge];
    
    SELECT * FROM [dbo].[TestTarget];
    SELECT * FROM [dbo].[TestStats];
    SELECT * FROM [dbo].[TestQueue];
    

    结果
    TestTarget
    +----+-----+-----+
    | ID | foo | bar |
    +----+-----+-----+
    |  1 | A   | AA  |
    |  2 | B   | BB  |
    |  3 | C   | CC  |
    +----+-----+-----+
    
    TestStats
    +-------------+
    | MergeAction |
    +-------------+
    | INSERT      |
    | UPDATE      |
    | UPDATE      |
    +-------------+
    
    TestQueue
    +----------+------+------+
    | TargetID | foo  | baz  |
    +----------+------+------+
    |        0 | NULL | NULL |
    |        3 | C    | CCC  |
    +----------+------+------+
    
    INSERT ... EXEC期间会有额外提示:
    Duplicate key was ignored.
    

    如果 MERGE更新了一些行。当唯一索引在 INSERT 期间丢弃某些行时发送此警告消息。由于 IGNORE_DUP_KEY = ON .

    A warning message will occur when duplicate key values are inserted into a unique index. Only the rows violating the uniqueness constraint will fail.

    关于sql - DBMS 级别的管道和过滤器 : Splitting the MERGE output stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34533204/

    相关文章:

    java - 从 Java 运行 Oracle 查询

    Mysql触发器问题(我认为它正在触发)

    sql-server - 如何在创建过程或函数之前终止 SQL 脚本

    sql - 使用INSERT-OUTPUT为另一个INSERT提供值

    mysql - Django - 查询外键 ID 时避免连接?

    mysql - 帮忙查询

    SQL 开发人员导入数据向导显示为空白

    sql-server - 检查行数据是否已更改

    SQL 左外连接不提供完整表

    sql-server - 使用 cte 按顺序更新不起作用,为什么?