c# - 将大型 CSV 文件并行导出到 SQL Server

标签 c# sql sql-server multithreading csv

我的硬盘上有一个很大的 CSV 文件……10 列,1 亿行,大约 6 GB。 我想逐行读取此 CSV 文件,然后使用 SQL 批量复制将数据加载到 Microsoft SQL 服务器数据库中。 我在这里和互联网上阅读了几个线程。大多数人认为并行读取 CSV 文件在效率方面并没有太大意义,因为任务/线程争用磁盘访问。

我想要做的是,从 CSV 中逐行读取并将其添加到大小为 100K 行的阻塞集合中。一旦这个集合完全启动一个新的任务/线程,使用 SQLBuckCopy API 将数据写入 SQL 服务器。

我已经编写了这段代码,但在运行时遇到了一个错误,提示“尝试对具有挂起操作的对象调用批量复制。”这种情况看起来可以使用 .NET 4.0 TPL 轻松解决,但我无法让它工作。对我做错了什么有什么建议吗?

    public static void LoadCsvDataInParalleToSqlServer(string fileName, string connectionString, string table, DataColumn[] columns, bool truncate)
    {
        const int inputCollectionBufferSize = 1000000;
        const int bulkInsertBufferCapacity = 100000;
        const int bulkInsertConcurrency = 8;

        var sqlConnection = new SqlConnection(connectionString);
        sqlConnection.Open();

        var sqlBulkCopy = new SqlBulkCopy(sqlConnection.ConnectionString, SqlBulkCopyOptions.TableLock)
        {
            EnableStreaming = true,
            BatchSize = bulkInsertBufferCapacity,
            DestinationTableName = table,
            BulkCopyTimeout = (24 * 60 * 60),
        };

        BlockingCollection<DataRow> rows = new BlockingCollection<DataRow>(inputCollectionBufferSize);
        DataTable dataTable = new DataTable(table);
        dataTable.Columns.AddRange(columns);

        Task loadTask = Task.Factory.StartNew(() =>
            {
                foreach (DataRow row in ReadRows(fileName, dataTable))
                {
                    rows.Add(row);
                }

                rows.CompleteAdding();
            });

        List<Task> insertTasks = new List<Task>(bulkInsertConcurrency);

        for (int i = 0; i < bulkInsertConcurrency; i++)
        {
            insertTasks.Add(Task.Factory.StartNew((x) =>
                {
                    List<DataRow> bulkInsertBuffer = new List<DataRow>(bulkInsertBufferCapacity);

                    foreach (DataRow row in rows.GetConsumingEnumerable())
                    {
                        if (bulkInsertBuffer.Count == bulkInsertBufferCapacity)
                        {
                            SqlBulkCopy bulkCopy = x as SqlBulkCopy;
                            var dataRows = bulkInsertBuffer.ToArray();
                            bulkCopy.WriteToServer(dataRows);
                            Console.WriteLine("Inserted rows " + bulkInsertBuffer.Count);
                            bulkInsertBuffer.Clear();
                        }

                        bulkInsertBuffer.Add(row);
                    }

                },
                sqlBulkCopy));
        }

        loadTask.Wait();
        Task.WaitAll(insertTasks.ToArray());
    }

    private static IEnumerable<DataRow> ReadRows(string fileName, DataTable dataTable)
    {
        using (var textFieldParser = new TextFieldParser(fileName))
        {
            textFieldParser.TextFieldType = FieldType.Delimited;
            textFieldParser.Delimiters = new[] { "," };
            textFieldParser.HasFieldsEnclosedInQuotes = true;

            while (!textFieldParser.EndOfData)
            {
                string[] cols = textFieldParser.ReadFields();

                DataRow row = dataTable.NewRow();

                for (int i = 0; i < cols.Length; i++)
                {
                    if (string.IsNullOrEmpty(cols[i]))
                    {
                        row[i] = DBNull.Value;
                    }
                    else
                    {
                        row[i] = cols[i];
                    }
                }

                yield return row;
            }
        }
    }

最佳答案

不要。

并行访问可能会或可能不会让您更快地读取文件(它不会,但我不会打战斗......)但对于某些并行写入它赢了'给你更快的批量插入。那是因为最少记录的大容量插入(即非常快大容量插入)需要表锁。参见 Prerequisites for Minimal Logging in Bulk Import :

Minimal logging requires that the target table meets the following conditions:

...
- Table locking is specified (using TABLOCK).
...

并行插入,顾名思义,不能获得并发表锁。 QED。你找错人了。

停止从互联网上随机查找您的来源。阅读The Data Loading Performance Guide ,是关于......高性能数据加载的指南。

我会建议您停止发明轮子。使用 SSIS ,这正是旨在处理的问题。

关于c# - 将大型 CSV 文件并行导出到 SQL Server,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26500523/

相关文章:

sql-server - SQL Server 中的 Postgres MD5() 等效项是什么?

sql-server - 转换为日期时间 MM/dd/yyyy HH :mm:ss in Sql Server

c# - 如何在 UWP 应用程序中打印 pdf 文件后减少内存消耗

c# - 从中间件发送 JSON 响应

mysql - 在mysql中使用case语句更新表

sql - 将所有与不存在的并入并插入

c# - 为什么将 .Wait() 添加到我的任务有时会允许任务使用主线程?

c# - 将 SolidColorBrush 绑定(bind)到背景,绑定(bind)错误

sql - Hibernate setMaxResults() 不适用于 Sybase 数据库查询

sql - Oracle SQL 语句中条件的顺序重要吗?