我的硬盘上有一个很大的 CSV 文件……10 列,1 亿行,大约 6 GB。 我想逐行读取此 CSV 文件,然后使用 SQL 批量复制将数据加载到 Microsoft SQL 服务器数据库中。 我在这里和互联网上阅读了几个线程。大多数人认为并行读取 CSV 文件在效率方面并没有太大意义,因为任务/线程争用磁盘访问。
我想要做的是,从 CSV 中逐行读取并将其添加到大小为 100K 行的阻塞集合中。一旦这个集合完全启动一个新的任务/线程,使用 SQLBuckCopy API 将数据写入 SQL 服务器。
我已经编写了这段代码,但在运行时遇到了一个错误,提示“尝试对具有挂起操作的对象调用批量复制。”这种情况看起来可以使用 .NET 4.0 TPL 轻松解决,但我无法让它工作。对我做错了什么有什么建议吗?
public static void LoadCsvDataInParalleToSqlServer(string fileName, string connectionString, string table, DataColumn[] columns, bool truncate)
{
const int inputCollectionBufferSize = 1000000;
const int bulkInsertBufferCapacity = 100000;
const int bulkInsertConcurrency = 8;
var sqlConnection = new SqlConnection(connectionString);
sqlConnection.Open();
var sqlBulkCopy = new SqlBulkCopy(sqlConnection.ConnectionString, SqlBulkCopyOptions.TableLock)
{
EnableStreaming = true,
BatchSize = bulkInsertBufferCapacity,
DestinationTableName = table,
BulkCopyTimeout = (24 * 60 * 60),
};
BlockingCollection<DataRow> rows = new BlockingCollection<DataRow>(inputCollectionBufferSize);
DataTable dataTable = new DataTable(table);
dataTable.Columns.AddRange(columns);
Task loadTask = Task.Factory.StartNew(() =>
{
foreach (DataRow row in ReadRows(fileName, dataTable))
{
rows.Add(row);
}
rows.CompleteAdding();
});
List<Task> insertTasks = new List<Task>(bulkInsertConcurrency);
for (int i = 0; i < bulkInsertConcurrency; i++)
{
insertTasks.Add(Task.Factory.StartNew((x) =>
{
List<DataRow> bulkInsertBuffer = new List<DataRow>(bulkInsertBufferCapacity);
foreach (DataRow row in rows.GetConsumingEnumerable())
{
if (bulkInsertBuffer.Count == bulkInsertBufferCapacity)
{
SqlBulkCopy bulkCopy = x as SqlBulkCopy;
var dataRows = bulkInsertBuffer.ToArray();
bulkCopy.WriteToServer(dataRows);
Console.WriteLine("Inserted rows " + bulkInsertBuffer.Count);
bulkInsertBuffer.Clear();
}
bulkInsertBuffer.Add(row);
}
},
sqlBulkCopy));
}
loadTask.Wait();
Task.WaitAll(insertTasks.ToArray());
}
private static IEnumerable<DataRow> ReadRows(string fileName, DataTable dataTable)
{
using (var textFieldParser = new TextFieldParser(fileName))
{
textFieldParser.TextFieldType = FieldType.Delimited;
textFieldParser.Delimiters = new[] { "," };
textFieldParser.HasFieldsEnclosedInQuotes = true;
while (!textFieldParser.EndOfData)
{
string[] cols = textFieldParser.ReadFields();
DataRow row = dataTable.NewRow();
for (int i = 0; i < cols.Length; i++)
{
if (string.IsNullOrEmpty(cols[i]))
{
row[i] = DBNull.Value;
}
else
{
row[i] = cols[i];
}
}
yield return row;
}
}
}
最佳答案
不要。
并行访问可能会或可能不会让您更快地读取文件(它不会,但我不会打那战斗......)但对于某些并行写入它赢了'给你更快的批量插入。那是因为最少记录的大容量插入(即非常快大容量插入)需要表锁。参见 Prerequisites for Minimal Logging in Bulk Import :
Minimal logging requires that the target table meets the following conditions:
...
- Table locking is specified (using TABLOCK).
...
并行插入,顾名思义,不能获得并发表锁。 QED。你找错人了。
停止从互联网上随机查找您的来源。阅读The Data Loading Performance Guide ,是关于......高性能数据加载的指南。
我会建议您停止发明轮子。使用 SSIS ,这正是旨在处理的问题。
关于c# - 将大型 CSV 文件并行导出到 SQL Server,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26500523/