c# - 从C#到SQL Server的批量插入策略

标签 c# sql-server bulkinsert sqlbulkcopy

在我们当前的项目中,客户会将复杂/嵌套消息的集合发送到我们的系统。这些消息的频率约为。 1000-2000 msg/每秒。

这些复杂的对象包含事务数据(要添加)和主数据(如果找不到则将添加)。但是,客户没有传递主数据的ID,而是传递了“名称”列。

系统检查这些名称是否存在主数据。如果找到,它将使用数据库中的ID,否则先创建此主数据,然后再使用这些ID。

解析主数据ID后,系统会将交易数据插入SQL Server数据库(使用主数据ID)。每封邮件的主实体数量约为15-20。

以下是我们可以采用的一些策略。

  • 我们可以首先从C#代码中解析主ID(如果找不到主ID,则插入主数据),然后将这些ID存储在C#缓存中。解析完所有ID后,我们可以使用SqlBulkCopy类批量插入交易数据。我们可以访问数据库15次以获取不同实体的ID,然后再访问数据库一次以插入最终数据。我们可以使用相同的连接在完成所有这些处理后将其关闭。
  • 我们可以将所有包含主数据和事务数据的消息一次性发送到数据库(以多个TVP的形式),然后发送到存储过程中,首先为丢失的数据创建主数据,然后插入事务数据。

  • 在这个用例中,有人可以建议最好的方法吗?

    由于某些隐私问题,我无法共享实际的对象结构。但是,这里是假设的对象结构,它非常接近我们的业务对象。

    其中一条消息将包含有关来自不同供应商的一种产品的信息(其主数据)及其价格详细信息(交易数据):

    主数据(如果找不到,则需要添加)

    产品名称:ABC,产品类别:XYZ,制造商:XXX和其他一些详细信息(属性数量在15到20的范围内)。

    交易数据(将始终添加)

    供应商名称:A,标价:XXX,折扣:XXX

    供应商名称:B,标价:XXX,折扣:XXX

    供应商名称:C,标价:XXX,折扣:XXX

    供应商名称:D,标价:XXX,折扣:XXX

    对于一条消息,它属于一种产品(关于该消息的更改不会经常发生),有关主数据的大多数信息将保持不变,但交易数据将始终波动。因此,系统将检查系统中是否存在产品“XXX”。如果不是,请检查是否存在此产品提及的“类别”。如果没有,它将为类别和产品插入新记录。对于制造商和其他主数据,将执行此操作。

    多个供应商将同时发送有关多个产品(2000-5000)的数据。

    因此,假设我们有1000个供应商,每个供应商正在发送有关10-15种不同产品的数据。每隔2-3秒,每个供应商都会向我们发送这10种产品的价格更新。他可能会开始发送有关新产品的数据,但是这种数据不会很频繁。

    最佳答案

    #2想法可能是最好的选择(即使用多个TVP一次将所有15-20个实体发送到数据库,并处理多达2000条消息的整个集合)。

    在应用程序层缓存主数据查找并在发送到数据库之前进行翻译听起来不错,但会遗漏一些东西:

  • 无论如何,您将必须访问数据库才能获得初始列表
  • 无论如何,您将不得不点击数据库以插入新条目。
  • 在字典中查找要替换为ID的值正是数据库所要做的(假定在每个从名称到ID的查找中都使用非聚集索引)
  • 经常查询的值会将其数据页缓存在缓冲池(这是内存缓存)中。

  • 为什么要在应用程序层复制已经提供并在数据库层立即发生的事情,特别是考虑到以下情况:
  • 15至20个实体最多可具有20k条记录(这是一个相对较小的记录,尤其是考虑到非聚集索引仅需要两个字段时:NameID可以将许多行打包到单个数据页中)使用100%填充因子)。
  • 并非所有20k条目都是“ Activity ”或“当前”的,因此您不必担心将它们全部缓存。因此,无论当前值是什么,都可以很容易地将其标识为要查询的值,而那些数据页(可能包括一些不 Activity 的条目,但在那里没什么大不了的)将成为要缓存在缓冲池中的值。

  • 因此,您不必担心由于可能自然而然地更改值(例如,特定Name的已更新ID)而可能会导致值更改(例如,更新旧的SqlBulkCopy),从而使旧的条目变旧或强制 key 过期或重新加载。

    是的,内存缓存是一项很棒的技术,可以极大地加快网站访问速度,但是这些方案/用例适用于非数据库进程出于纯粹的只读目的一遍又一遍地请求相同数据的情况。但是,在这种特殊情况下,数据将被合并,并且查找值的列表可能会频繁更改(而且,这是由于新条目而不是更新条目)。

    话虽这么说,选择#2是必经之路。尽管没有15个TVP,但我已经多次成功地完成了这项技术。可能需要对该方法进行一些优化/调整以调整此特定情况,但是我发现效果很好的是:
  • 通过TVP接受数据。与DataTable相比,我更喜欢这样做,因为:
  • ,它使存储过程易于自包含
  • 非常适合应用程序代码,以将集合完全流式传输到数据库,而无需先将集合复制到IEnumerable<SqlDataRecord>中,这会复制集合,这会浪费CPU和内存。这要求您为每个集合创建一个返回yield return;的方法,接受该集合作为输入,并使用forforeachTOP (@RecordCount)循环中发送每个记录。
  • TVP不适用于统计信息,因此也不适用于JOINing(尽管可以通过在查询中使用[Name]来缓解这种情况),但是您不必担心,因为它们仅用于填充实际表缺少任何值
  • 步骤1:为每个实体插入缺少的名称。请记住,每个实体的[Name]字段上应该有一个非聚集索引,并且假设ID是聚集索引,则该值自然会成为索引的一部分,因此INSERT...SELECT除了会帮助您以下操作。并且还请记住,此客户端的任何先前执行(即大致相同的实体值)都将导致这些索引的数据页保留在缓冲池(即内存)中。

    ;WITH cte AS
    (
      SELECT DISTINCT tmp.[Name]
      FROM   @EntityNumeroUno tmp
    )
    INSERT INTO EntityNumeroUno ([Name])
      SELECT cte.[Name]
      FROM   cte
      WHERE  NOT EXISTS(
                     SELECT *
                     FROM   EntityNumeroUno tab
                     WHERE  tab.[Name] = cte.[Name]
                       )
    
  • 步骤2:由于步骤1
  • ,以简单的SqlBulkCopy插入所有“消息”,其中用于查找表(即“实体”)的数据页已缓存在缓冲池中


    最后,请记住,猜想/假设/有根据的猜测不能替代测试。您需要尝试一些方法来查看哪种方法最适合您的特定情况,因为可能还有一些其他细节尚未共享,这些细节可能会影响此处的“理想”条件。

    我会说,如果消息仅是插入的,那么弗拉德的想法可能会更快。我在这里描述的方法是在更复杂且需要完全同步(更新和删除)的情况下使用的,并进行了其他验证和相关操作数据(而不是查找值)的创建。在直接插入上使用SqlBulkCopy可能会更快(尽管对于仅2000条记录,我怀疑是否存在太多差异),但这是假设您直接将其加载到目标表(消息和查找),而不是直接加载到中间表/临时表(而且我相信Vlad的想法是直接将SendRows链接到目标表)。但是,如上所述,由于更新查找值的问题,使用外部高速缓存(即不使用缓冲池)也更容易出错。使外部高速缓存失效可能需要花费更多的代码,而不是值得花的钱,特别是如果使用外部高速缓存仅稍微快一点的话。需要综合考虑哪种额外的风险/维护方法才能更好地满足您的需求。

    更新

    根据评论中提供的信息,我们现在知道:
  • 有多个供应商
  • 每个供应商提供多种产品
  • 产品并非供应商独有;产品由1个或多个供应商出售
  • 产品属性为单数
  • 定价信息具有可以包含多个记录的属性
  • 定价信息仅是INSERT(即时间点历史记录)
  • 唯一产品由SKU(或类似字段)决定
  • 创建后,带有现有SKU但其他属性不同的产品(例如类别,制造商等)将被视为同一产品;差异将被忽略

  • 考虑到所有这些,我仍然会推荐TVP,但是要重新考虑这种方法,并使之成为以供应商为中心,而不是以产品为中心。这里的假设是供应商随时发送文件。因此,当您获取文件时,将其导入。您将要提前进行的唯一查找是供应商。这是基本布局:
  • 在这一点上假设您已经具有VendorID似乎是合理的,因为系统为什么要从未知来源导入文件?
  • 您可以批量导入
  • 创建一个int BatchSize方法,该方法:
  • 接受FileStream或允许通过文件
  • 前进的内容
  • 接受类似IEnumerable<SqlDataRecord>的内容
  • 返回SqlDataRecord
  • 创建一个SqlDataRecord以匹配TVP结构
  • for循环通过FileStream,直到达到BatchSize或File
  • 中没有更多记录为止
  • 对数据
  • 执行任何必要的验证
  • 将数据映射到yield return;
  • 调用SendRows(FileStream, BatchSize)
  • 打开文件
  • 文件中有数据时
  • 调用存储的过程
  • 在VendorID中传递
  • 传递给
  • 为TVP
  • 输入IEnumerable<SqlDataRecord>
  • 关闭文件
  • 实验:
  • 在围绕FileStream的循环之前打开SqlConnection,并在循环完成后关闭它
  • 打开SqlConnection,执行存储过程,然后在FileStream循环中关闭SqlConnection
  • 使用各种BatchSize值进行试验。从100开始,然后从200、500等开始。
  • 存储的proc将处理插入新产品

  • 使用这种类型的结构,您将发送未使用的产品属性(即,仅SKU用于查找现有产品)。但是,它可以很好地扩展,因为文件大小没有上限。如果供应商发送了50种产品,则可以。如果他们发送50k产品,则可以。如果他们发送了400万个产品(这是我正在使用的系统,并且确实处理了更新的产品信息,而该信息因其任何属性而异!),那就很好了。应用程序层或数据库层的内存没有增加,无法处理1000万个产品。导入所需的时间应与发送的产品数量同步增加。

    更新2
    与源数据有关的新详细信息:
  • 来自Azure EventHub
  • 以C#对象的形式出现(无文件)
  • 产品详细信息通过O.P.系统的API传入
  • 收集在单个队列中(只需将数据拉出插入数据库即可)

  • 如果数据源是C#对象,那么我肯定会使用TVP,因为您可以通过我在第一次更新中描述的方法(即返回ojit_code的方法)按原样发送它们。发送一个或多个TVP,以获取每个供应商的价格/报价详细信息,但发送常规输入参数以获取单个属性属性。例如:

    CREATE PROCEDURE dbo.ImportProduct
    (
      @SKU             VARCHAR(50),
      @ProductName     NVARCHAR(100),
      @Manufacturer    NVARCHAR(100),
      @Category        NVARCHAR(300),
      @VendorPrices    dbo.VendorPrices READONLY,
      @DiscountCoupons dbo.DiscountCoupons READONLY
    )
    SET NOCOUNT ON;
    
    -- Insert Product if it doesn't already exist
    IF (NOT EXISTS(
             SELECT  *
             FROM    dbo.Products pr
             WHERE   pr.SKU = @SKU
                  )
       )
    BEGIN
      INSERT INTO dbo.Products (SKU, ProductName, Manufacturer, Category, ...)
      VALUES (@SKU, @ProductName, @Manufacturer, @Category, ...);
    END;
    
    ...INSERT data from TVPs
    -- might need OPTION (RECOMPILE) per each TVP query to ensure proper estimated rows
    

    关于c# - 从C#到SQL Server的批量插入策略,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28664844/

    相关文章:

    c# - 你能在 C# 中将对象转换为 long 吗?

    c# - 最大化/最小化时,如何调整 TextBox 的大小以适应窗口大小?

    sql - 重命名表列并将其传播到依赖 View

    c# - 使用 QueryCommand 对象批量插入

    sql-server - 如何在 OPENROWSET(BULK...) 中动态指定文件路径?

    c# - 如何告诉 Entity Framework Function Import 存储过程返回的列不可为空?

    c# - 如何仅使用跟踪的更改来压缩文件?

    sql-server - SQL Server 2008 - 确定哪个 SQL 代理作业正在运行存储过程

    sql-server - sql server 查询中的明智移位日期时间检查

    c# - 在生产应用程序中使用 SqlBulkInsert