.net - 如何在不创建大缓冲区的情况下将大型 .NET 对象图序列化为 SQL Server BLOB?

标签 .net sql-server serialization ado.net memory-management

我们有这样的代码:

ms = New IO.MemoryStream
bin = New System.Runtime.Serialization.Formatters.Binary.BinaryFormatter
bin.Serialize(ms, largeGraphOfObjects)
dataToSaveToDatabase = ms.ToArray()
// put dataToSaveToDatabase in a Sql server BLOB

但是内存流从大内存堆中分配了一个大缓冲区,这给我们带来了问题。那么我们如何在不需要足够的可用内存来保存序列化对象的情况下传输数据呢?

我正在寻找一种从 SQL Server 获取 Stream 的方法,然后将其传递给 bin.Serialize() ,从而避免将所有数据保留在我的进程内存中。

读回数据也是如此...

<小时/>

更多背景信息。

这是复杂的数字处理系统的一部分,该系统近乎实时地处理数据以查找设备问题等,完成序列化是为了在数据源等的数据质量出现问题时允许重新启动。(我们存储数据馈送,并可以在运算符(operator)编辑掉错误值后重新运行它们。)

因此,我们序列化对象的频率比反序列化它们的频率要高得多。

我们正在序列化的对象包括非常大的数组(大部分为 double )以及许多小的“更正常”对象。我们正在插入 32 位系统的内存限制,并使垃圾收集器非常努力地工作。 (系统中的其他地方正在采取措施来改善这一点,例如重用大型数组而不是创建新数组。)

状态的序列化通常是 last straw导致内存不足异常;我们的内存使用高峰总是在 此序列化步骤。

认为当我们反序列化对象时,我们会得到大的内存池碎片,考虑到数组的大小,我预计大内存池碎片还会存在其他问题。 (这还没有被调查,因为首先看到这个的人是数值处理专家,而不是内存管理专家。)

我们的客户混合使用 SQL Server 2000、2005 和 2008,如果可能的话,我们不希望每个版本的 SQL Server 使用不同的代码路径。

我们一次可以有许多事件模型(在不同的进程中,跨多台机器),每个模型可以有许多保存的状态。因此,保存的状态存储在数据库 blob 中,而不是文件中。

由于保存状态的传播很重要,我宁愿不将对象序列化到文件,然后将文件一次一个 block 地放入 BLOB 中。

我问过的其他相关问题

最佳答案

没有内置的 ADO.Net 功能可以真正优雅地处理大数据。问题有两个:

  • 没有 API 可以像写入流一样“写入”SQL 命令或参数。接受流的参数类型(例如FileStream)接受流以从中READ,这与write的序列化语义不一致成一条流。无论你以哪种方式改变它,你最终都会得到整个序列化对象的内存副本,这很糟糕。
  • 即使上述问题能够得到解决(而且不可能),TDS 协议(protocol)和 SQL Server 接受参数的方式也不能很好地处理大参数,因为在启动执行之前必须先接收整个请求这将在 SQL Server 内创建该对象的附加副本。

所以你真的必须从不同的角度来处理这个问题。幸运的是,有一个相当简单的解决方案。诀窍是使用高效的 UPDATE .WRITE 语法,并在一系列 T-SQL 语句中逐个传入数据 block 。这是MSDN推荐的方式,参见Modifying Large-Value (max) Data in ADO.NET 。这看起来很复杂,但实际上做起来很简单,并将其插入 Stream 类中。

<小时/>

BlobStream 类

这是解决方案的基础。 Stream 派生类,将 Write 方法实现为对 T-SQL BLOB WRITE 语法的调用。直接来说,唯一有趣的是它必须跟踪第一次更新,因为 UPDATE ... SET blob.WRITE(...) 语法在 NULL 字段上会失败:

class BlobStream: Stream
{
    private SqlCommand cmdAppendChunk;
    private SqlCommand cmdFirstChunk;
    private SqlConnection connection;
    private SqlTransaction transaction;

    private SqlParameter paramChunk;
    private SqlParameter paramLength;

    private long offset;

    public BlobStream(
        SqlConnection connection,
        SqlTransaction transaction,
        string schemaName,
        string tableName,
        string blobColumn,
        string keyColumn,
        object keyValue)
    {
        this.transaction = transaction;
        this.connection = connection;
        cmdFirstChunk = new SqlCommand(String.Format(@"
UPDATE [{0}].[{1}]
    SET [{2}] = @firstChunk
    WHERE [{3}] = @key"
            ,schemaName, tableName, blobColumn, keyColumn)
            , connection, transaction);
        cmdFirstChunk.Parameters.AddWithValue("@key", keyValue);
        cmdAppendChunk = new SqlCommand(String.Format(@"
UPDATE [{0}].[{1}]
    SET [{2}].WRITE(@chunk, NULL, NULL)
    WHERE [{3}] = @key"
            , schemaName, tableName, blobColumn, keyColumn)
            , connection, transaction);
        cmdAppendChunk.Parameters.AddWithValue("@key", keyValue);
        paramChunk = new SqlParameter("@chunk", SqlDbType.VarBinary, -1);
        cmdAppendChunk.Parameters.Add(paramChunk);
    }

    public override void Write(byte[] buffer, int index, int count)
    {
        byte[] bytesToWrite = buffer;
        if (index != 0 || count != buffer.Length)
        {
            bytesToWrite = new MemoryStream(buffer, index, count).ToArray();
        }
        if (offset == 0)
        {
            cmdFirstChunk.Parameters.AddWithValue("@firstChunk", bytesToWrite);
            cmdFirstChunk.ExecuteNonQuery();
            offset = count;
        }
        else
        {
            paramChunk.Value = bytesToWrite;
            cmdAppendChunk.ExecuteNonQuery();
            offset += count;
        }
    }

    // Rest of the abstract Stream implementation
 }
<小时/>

使用 BlobStream

要使用这个新创建的 blob 流类,您需要将其插入到 BufferedStream 中。该类的设计很简单,仅处理将流写入表的列中。我将重用另一个示例中的表格:

CREATE TABLE [dbo].[Uploads](
    [Id] [int] IDENTITY(1,1) NOT NULL,
    [FileName] [varchar](256) NULL,
    [ContentType] [varchar](256) NULL,
    [FileData] [varbinary](max) NULL)

我将添加一个要序列化的虚拟对象:

[Serializable]
class HugeSerialized
{
    public byte[] theBigArray { get; set; }
}

最后,实际的序列化。我们首先将一条新记录插入 Uploads 表中,然后在新插入的 Id 上创建一个 BlobStream 并直接在此流中调用序列化:

using (SqlConnection conn = new SqlConnection(Settings.Default.connString))
{
    conn.Open();
    using (SqlTransaction trn = conn.BeginTransaction())
    {
        SqlCommand cmdInsert = new SqlCommand(
@"INSERT INTO dbo.Uploads (FileName, ContentType)
VALUES (@fileName, @contentType);
SET @id = SCOPE_IDENTITY();", conn, trn);
        cmdInsert.Parameters.AddWithValue("@fileName", "Demo");
        cmdInsert.Parameters.AddWithValue("@contentType", "application/octet-stream");
        SqlParameter paramId = new SqlParameter("@id", SqlDbType.Int);
        paramId.Direction = ParameterDirection.Output;
        cmdInsert.Parameters.Add(paramId);
        cmdInsert.ExecuteNonQuery();

        BlobStream blob = new BlobStream(
            conn, trn, "dbo", "Uploads", "FileData", "Id", paramId.Value);
        BufferedStream bufferedBlob = new BufferedStream(blob, 8040);

        HugeSerialized big = new HugeSerialized { theBigArray = new byte[1024 * 1024] };
        BinaryFormatter bf = new BinaryFormatter();
        bf.Serialize(bufferedBlob, big);

        trn.Commit();
    }
}
<小时/>

如果您监视这个简单示例的执行,您会发现没有任何地方创建大型序列化流。该示例将分配 [1024*1024] 的数组,但这只是出于演示目的,以便有一些内容可以序列化。此代码以缓冲方式逐 block 序列化,使用 SQL Server BLOB 建议的每次更新大小 8040 字节。

关于.net - 如何在不创建大缓冲区的情况下将大型 .NET 对象图序列化为 SQL Server BLOB?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2101149/

相关文章:

c# - 从 Dotnet Google API 获取用户电子邮件信息

c# - 优化 Parallel.For 的性能

c# - Lucene.Net - 如何将空格分隔的短语视为单个标记?

带有 WHERE 子句的 SQL 查询 CASE 语句

java - 序列化/反序列化 LinkedHashMap (android) java

c# - 数据库迁移和 View

sql - 如何按排序值选择数据顺序并按名称对 NULL 进行排序

sql-server - 无法解决等于操作中 "SQL_Latin1_General_Pref_CP1_CI_AS"和 "Latin1_General_CI_AS"之间的排序规则冲突

c# - mono 是否支持带有preserveObjectReferences 标志的DataContractSerializer?

c# - 可序列化与非序列化类属性