请求 dataTable.NewRow() 时 DataTable“数组尺寸超出支持的范围”

问题描述

出于某种疯狂的原因,我在以合理的块将数据传输到 sql 时收到 OutOfMemoryException,并且几乎不使用任何内存:

System.OutOfMemoryException: Exception of type 'System.OutOfMemoryException' was thrown.
   at System.Data.DataTable.NewRowArray(Int32 size)
   at System.Data.RecordManager.GrowRecordCapacity()
   at System.Data.RecordManager.NewRecordBase()
   at System.Data.DataTable.NewRecord(Int32 sourceRecord)
   at Axis.PortfolioAccumulation.Data.PortfolioAccumulationDbContext.d__22`1.MoveNext() in D:\Azure_DevOps_Reins_Prod_Agent_A\_work\7\s\Axis.PortfolioAccumulation.DataLayer\Axis.PortfolioAccumulation.Data\BulkInsert\StreamedsqlBulkcopy.cs:line 46

在下面的 while 循环中调用 dataTable.NewRow() 时发生错误,一旦我超过第 30 百万行:

/// <summary>Helper to stream a large number of records into sql without
/// ever having to materialize the entire enumerable into memory at once.</summary>
/// <param name="destinationTableName">The name of the table in the database to copy data to.</param>
/// <param name="dataTable">A new instance of the DataTable class that matches the schema of the table to insert to.
/// This should match exactly (same column names) what is in sql,for automatic column mapping to work.</param>
/// <param name="sourceData">The enumerable of data that will be used to generate DaTarows</param>
/// <param name="populateRow">A delegate function that populates and returns a new data row for a given record.</param>
/// <param name="memoryBatchSize">The number of DaTarows to generate in memory before passing them to sqlBulkcopy</param>
/// <param name="insertBatchSize">The batch size of inserts performed by sqlBulkcopy utility.</param>
public async Task StreamedsqlBulkcopy<T>(
    string destinationTableName,DataTable dataTable,IEnumerable<T> sourceData,Func<T,DaTarow,DaTarow> populateRow,int memoryBatchSize = 1000000,int insertBatchSize = 5000)
{
    using (sqlConnection connection = new sqlConnection(Database.Connection.ConnectionString))
    {
        connection.open();
        using (sqlBulkcopy bulkcopy = new sqlBulkcopy(connection,sqlBulkcopyOptions.TableLock,null))
        using (IEnumerator<T> enumerator = sourceData.GetEnumerator())
        {
            // Configure the single sqlBulkcopy instance that will be used to copy all "batches"
            bulkcopy.DestinationTableName = destinationTableName;
            bulkcopy.BatchSize = insertBatchSize;
            bulkcopy.BulkcopyTimeout = _bulkInsertTimeOut;
            foreach (DataColumn column in dataTable.Columns)
                bulkcopy.columnmappings.Add(column.ColumnName,column.ColumnName);
            // Begin enumerating over all records,preparing batches no larger than "memoryBatchSize"
            bool hasNext = true;
            while (hasNext)
            {
                DaTarow[] batch = new DaTarow[memoryBatchSize];
                int filled = 0;
                while ((hasNext = enumerator.MoveNext()) && filled < memoryBatchSize)
                    batch[filled++] = populateRow(enumerator.Current,dataTable.NewRow());
                // When we reach the end of the enumerable,we need to shrink the final buffer array
                if (filled < memoryBatchSize)
                    Array.Resize(ref batch,filled);
                await bulkcopy.WritetoServerAsync(batch);
            }
        }
    }
}

正如希望清楚的那样,上述帮助程序的目的是使用 IEnumerable<T> 读取器和将填充一行的委托将(非常大的)sqlBulkcopy 数据流式传输到 sql 表给定的元素。

示例用法是:

public async Task SaveExchangeRates(List<FxRate> fxRates)
{
    var createDate = DateTimeOffset.UtcNow;
    await StreamedsqlBulkcopy("RefData.ExchangeRate",GetExchangeRateDataTable(),fxRates,(fx,newRow) =>
        {
            newRow["BaseCurrency"] = "USD";
            newRow["TargetCurrency"] = fx.CurrencyCode;
            newRow["ExchangeDate"] = fx.ExchangeRateDate;
            newRow["DollarValue"] = fx.ValueInUsd;
            return newRow;
        });
}

private DataTable GetExchangeRateDataTable()
{
    var dataTable = new DataTable();
    dataTable.Columns.Add("ExchangeDate",typeof(DateTime));
    dataTable.Columns.Add("BaseCurrency",typeof(string));
    dataTable.Columns.Add("TargetCurrency",typeof(string));
    dataTable.Columns.Add("DollarValue",typeof(double));
    return dataTable;
}

解决方法

事实证明,即使您仅将 DataTable 实例用作用于架构目的的空结构,并且即使您从未调用 dataTable.Rows.Add() 将其实际添加到表中,在其中每次调用 NewRow 时它都会增加一个计数器,并且显然甚至增加了一个占位符数组,它希望您最终插入所有这些行?

无论如何,解决方法是通过用自身的克隆覆盖它来定期“重置”模板:

dataTable = dataTable.Clone();

当然不优雅,但比尝试实现自己的 IDataReader 要容易,后者是利用 SQLBulkCopy 的唯一其他方法。 (也就是说 - 对于任何其他人尝试流式传输到 SQL 批量复制不受约束以避免像我一样避免使用 3rd 方库,请查看 FastMember 包中的 Marc Gravel 的 ObjectReader 和这个答案:https://stackoverflow.com/a/47208127/529618)

,

另一种简化(但以额外开销为代价)的方法是接受我们的命运并使用 DataTable 类而不是 DataRow 的数组 - 但创建 Clone()原始表周期性地避免 16,777,216 行的明显硬最大限制。

我不明白 DataTable 为您使用它创建的所有行维护一个数组,即使它们最终没有被添加 - 所以我们不妨利用而不是分配我们自己的。

使用 DataTable 的一些开销可以通过设置其初始容量以确保它不会增长(内存分配)并禁用尽可能多的事件来抵消:

相关变化如下:

bool hasNext = true;
while (hasNext)
{
    using (DataTable tableChunk = dataTable.Clone())
    {
        tableChunk.MinimumCapacity = memoryBatchSize + 1; // Avoid triggering resizing
        tableChunk.BeginLoadData(); // Speeds up inserting a large volume of rows a little
        int filled = 0;
        while ((hasNext = enumerator.MoveNext()) && filled++ < memoryBatchSize)
            tableChunk.Rows.Add(populateRow(enumerator.Current,tableChunk.NewRow()));
        await bulkCopy.WriteToServerAsync(tableChunk);
    }
}

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...