Dot-Net

如何在不創建大緩衝區的情況下將 .NET 對象的大圖序列化為 SQL Server BLOB?

  • January 20, 2010

我們有如下程式碼:

ms = New IO.MemoryStream
bin = New System.Runtime.Serialization.Formatters.Binary.BinaryFormatter
bin.Serialize(ms, largeGraphOfObjects)
dataToSaveToDatabase = ms.ToArray()
// put dataToSaveToDatabase in a Sql server BLOB

但是記憶體流從給我們帶來問題的大記憶體堆中分配了一個**大緩衝區。**那麼我們如何在不需要足夠的空閒記憶體來保存序列化對象的情況下流式傳輸數據。

我正在尋找一種從 SQL 伺服器獲取 Stream 的方法,然後可以將其傳遞給 bin.Serialize() 以避免將所有數據保留在我的程序記憶體中。

同樣用於讀取數據……


一些更多的背景。

這是一個複雜的數值處理系統的一部分,它近乎實時地處理數據,尋找設備問題等,序列化是為了在數據饋送等數據質量出現問題時允許重新啟動。(我們儲存數據饋送並且可以在操作員編輯掉錯誤值後重新執行它們。)

因此,我們序列化對象的頻率比反序列化它們的頻率要高得多。

我們正在序列化的對象包括非常大的數組,主要是雙精度數以及許多小的“更正常”的對象。我們正在推動 32 位系統的記憶體限制,並使垃圾收集器非常努力地工作。(系統中的其他地方正在改進這一點,例如重用大型陣列,而不是創建新陣列。)

通常狀態的序列化是導致記憶體不足異常的最後一根稻草;我們的記憶體使用高峰總是在這個序列化步驟中。

認為當我們反序列化對象時會出現大記憶體池碎片,我預計考慮到數組的大小,大記憶體池碎片還會存在其他問題。(這個還沒有研究,因為第一次看這個的人是數字處理專家,而不是記憶體管理專家。)

我們的客戶混合使用 SQL Server 2000、2005 和 2008,如果可能,我們不希望每個版本的 SQL Server 都有不同的程式碼路徑。

我們一次可以有許多活動模型(在不同的程序中,跨多台機器),每個模型可以有許多保存的狀態。因此,保存的狀態儲存在數據庫 blob 中,而不是文件中。

由於保存狀態的傳播很重要,我寧願不將對象序列化為文件,然後將文件一次一個塊地放入 BLOB 中。

我問過的其他相關問題

沒有內置的 ADO.Net 功能可以真正優雅地處理大數據。問題有兩個方面:

  • 沒有 API 可以將 SQL 命令或參數“寫入”到流中。接受流的參數類型(如FileStream)接受要從中讀取的流,這與寫入流的序列化語義不一致。無論您採用哪種方式,最終都會得到整個序列化對象的記憶體副本,很糟糕。
  • 即使上述問題可以解決(而且不可能),TDS 協議和 SQL Server 接受參數的方式也不能很好地處理大參數,因為在啟動執行之前必須首先接收整個請求,這將在 SQL Server 中創建對象的其他副本。

所以你真的必須從不同的角度來解決這個問題。幸運的是,有一個相當簡單的解決方案。訣竅是使用高效的UPDATE .WRITE語法並在一系列 T-SQL 語句中逐個傳遞數據塊。這是 MSDN 推薦的方式,請參閱Modifying Large-Value (max) Data in ADO.NET。這看起來很複雜,但實際上做起來很簡單,並且插入到 Stream 類中。


BlobStream 類

這是解決方案的基礎。將 Write 方法實現為對 T-SQL BLOB WRITE 語法的呼叫的 Stream 派生類。直截了當,唯一有趣的是它必須跟踪第一次更新,因為UPDATE ... SET blob.WRITE(...)語法會在 NULL 欄位上失敗:

class BlobStream: Stream
{
   private SqlCommand cmdAppendChunk;
   private SqlCommand cmdFirstChunk;
   private SqlConnection connection;
   private SqlTransaction transaction;

   private SqlParameter paramChunk;
   private SqlParameter paramLength;

   private long offset;

   public BlobStream(
       SqlConnection connection,
       SqlTransaction transaction,
       string schemaName,
       string tableName,
       string blobColumn,
       string keyColumn,
       object keyValue)
   {
       this.transaction = transaction;
       this.connection = connection;
       cmdFirstChunk = new SqlCommand(String.Format(@"
UPDATE [{0}].[{1}]
   SET [{2}] = @firstChunk
   WHERE [{3}] = @key"
           ,schemaName, tableName, blobColumn, keyColumn)
           , connection, transaction);
       cmdFirstChunk.Parameters.AddWithValue("@key", keyValue);
       cmdAppendChunk = new SqlCommand(String.Format(@"
UPDATE [{0}].[{1}]
   SET [{2}].WRITE(@chunk, NULL, NULL)
   WHERE [{3}] = @key"
           , schemaName, tableName, blobColumn, keyColumn)
           , connection, transaction);
       cmdAppendChunk.Parameters.AddWithValue("@key", keyValue);
       paramChunk = new SqlParameter("@chunk", SqlDbType.VarBinary, -1);
       cmdAppendChunk.Parameters.Add(paramChunk);
   }

   public override void Write(byte[] buffer, int index, int count)
   {
       byte[] bytesToWrite = buffer;
       if (index != 0 || count != buffer.Length)
       {
           bytesToWrite = new MemoryStream(buffer, index, count).ToArray();
       }
       if (offset == 0)
       {
           cmdFirstChunk.Parameters.AddWithValue("@firstChunk", bytesToWrite);
           cmdFirstChunk.ExecuteNonQuery();
           offset = count;
       }
       else
       {
           paramChunk.Value = bytesToWrite;
           cmdAppendChunk.ExecuteNonQuery();
           offset += count;
       }
   }

   // Rest of the abstract Stream implementation
}

使用 BlobStream

要使用這個新創建的 blob 流類,您需要插入一個BufferedStream. 該類有一個簡單的設計,只處理將流寫入表的列。我將重用另一個範例中的表格:

CREATE TABLE [dbo].[Uploads](
   [Id] [int] IDENTITY(1,1) NOT NULL,
   [FileName] [varchar](256) NULL,
   [ContentType] [varchar](256) NULL,
   [FileData] [varbinary](max) NULL)

我將添加一個要序列化的虛擬對象:

[Serializable]
class HugeSerialized
{
   public byte[] theBigArray { get; set; }
}

最後是實際的序列化。我們將首先在表中插入一條新記錄Uploads,然後在新插入的 Id 上創建一個BlobStream並將序列化直接呼叫到此流中:

using (SqlConnection conn = new SqlConnection(Settings.Default.connString))
{
   conn.Open();
   using (SqlTransaction trn = conn.BeginTransaction())
   {
       SqlCommand cmdInsert = new SqlCommand(
@"INSERT INTO dbo.Uploads (FileName, ContentType)
VALUES (@fileName, @contentType);
SET @id = SCOPE_IDENTITY();", conn, trn);
       cmdInsert.Parameters.AddWithValue("@fileName", "Demo");
       cmdInsert.Parameters.AddWithValue("@contentType", "application/octet-stream");
       SqlParameter paramId = new SqlParameter("@id", SqlDbType.Int);
       paramId.Direction = ParameterDirection.Output;
       cmdInsert.Parameters.Add(paramId);
       cmdInsert.ExecuteNonQuery();

       BlobStream blob = new BlobStream(
           conn, trn, "dbo", "Uploads", "FileData", "Id", paramId.Value);
       BufferedStream bufferedBlob = new BufferedStream(blob, 8040);

       HugeSerialized big = new HugeSerialized { theBigArray = new byte[1024 * 1024] };
       BinaryFormatter bf = new BinaryFormatter();
       bf.Serialize(bufferedBlob, big);

       trn.Commit();
   }
}

如果您監視這個簡單範例的執行,您將看到沒有創建大型序列化流。該範例將分配 [1024*1024] 的數組,但這是出於展示目的而需要序列化的內容。此程式碼以緩衝方式逐塊序列化,使用 SQL Server BLOB 建議的一次更新大小為 8040 字節。

引用自:https://stackoverflow.com/questions/2101149