c# - 具有 SQL Server 数据库调用的多线程 C# 应用程序

标签 c# sql sql-server multithreading architecture

我有一个 SQL Server 数据库,表 main 中有 500,000 条记录。还有其他三个表,分别称为 child1child2child3child1child2child3main 之间的多对多关系是通过三个关系表实现的: main_child1_relationshipmain_child2_relationshipmain_child3_relationship。我需要读取 main 中的记录,更新 main,并将新行插入到关系表中,并在子表中插入新记录。子表中的记录具有唯一性约束,因此实际计算 (CalculateDetails) 的伪代码类似于:

for each record in main
{
   find its child1 like qualities
   for each one of its child1 qualities
   {
      find the record in child1 that matches that quality
      if found
      {
          add a record to main_child1_relationship to connect the two records
      }
      else
      {
          create a new record in child1 for the quality mentioned
          add a record to main_child1_relationship to connect the two records
      }
   }
   ...repeat the above for child2
   ...repeat the above for child3 
}

这作为单线程应用程序运行良好。但它太慢了。 C# 中的处理任务繁重且耗时太长。我想把它变成一个多线程应用程序。

最好的方法是什么?我们正在使用 Linq to Sql。

到目前为止,我的方法是为来自 main 的每批记录创建一个新的 DataContext 对象,并使用 ThreadPool.QueueUserWorkItem 来处理它。然而,这些批处理相互踩着对方的脚趾,因为一个线程添加了一条记录,然后下一个线程尝试添加相同的记录,然后......我遇到了各种有趣的 SQL Server 死锁。

代码如下:

    int skip = 0;
    List<int> thisBatch;
    Queue<List<int>> allBatches = new Queue<List<int>>();
    do
    {
        thisBatch = allIds
                .Skip(skip)
                .Take(numberOfRecordsToPullFromDBAtATime).ToList();
        allBatches.Enqueue(thisBatch);
        skip += numberOfRecordsToPullFromDBAtATime;

    } while (thisBatch.Count() > 0);

    while (allBatches.Count() > 0)
    {
        RRDataContext rrdc = new RRDataContext();

        var currentBatch = allBatches.Dequeue();
        lock (locker)  
        {
            runningTasks++;
        }
        System.Threading.ThreadPool.QueueUserWorkItem(x =>
                    ProcessBatch(currentBatch, rrdc));

        lock (locker) 
        {
            while (runningTasks > MAX_NUMBER_OF_THREADS)
            {
                 Monitor.Wait(locker);
                 UpdateGUI();
            }
        }
    }

这里是 ProcessBatch:

    private static void ProcessBatch( 
        List<int> currentBatch, RRDataContext rrdc)
    {
        var topRecords = GetTopRecords(rrdc, currentBatch);
        CalculateDetails(rrdc, topRecords);
        rrdc.Dispose();

        lock (locker)
        {
            runningTasks--;
            Monitor.Pulse(locker);
        };
    }

    private static List<Record> GetTopRecords(RecipeRelationshipsDataContext rrdc, 
                                              List<int> thisBatch)
    {
        List<Record> topRecords;

        topRecords = rrdc.Records
                    .Where(x => thisBatch.Contains(x.Id))
                    .OrderBy(x => x.OrderByMe).ToList();
        return topRecords;
    }

CalculateDetails 最好用顶部的伪代码来解释。

我认为一定有更好的方法来做到这一点。请帮忙。非常感谢!

最佳答案

这是我对这个问题的看法:

  • 当使用多个线程在 SQL Server 或任何数据库中插入/更新/查询数据时,死锁是不可避免的。您必须假设它们会发生并适本地处理它们。

  • 这并不是说我们不应该尝试限制死锁的发生。但是,很容易了解 deadlocks 的基本原因。并采取措施防止它们发生,但 SQL Server 总是会让您大吃一惊:-)

一些死锁的原因:

  • 线程太多 - 尝试将线程数限制在最低限度,但当然我们需要更多线程以获得最佳性能。

  • 没有足够的索引。如果选择和更新没有足够的选择性,SQL 将取出比健康范围更大的锁。尝试指定适当的索引。

  • 索引过多。更新索引会导致死锁,因此请尽量将索引减少到所需的最低限度。

  • 事务隔离级别太高。默认 isolation level使用 .NET 时是“Serializable”,而默认使用 SQL Server 时是“Read Committed”。降低隔离级别会有很大帮助(当然如果合适的话)。

这就是我可能会如何解决您的问题:

  • 我不会推出自己的线程解决方案,我会使用 TaskParallel 库。我的主要方法看起来像这样:

    using (var dc = new TestDataContext())
    {
        // Get all the ids of interest.
        // I assume you mark successfully updated rows in some way
        // in the update transaction.
        List<int> ids = dc.TestItems.Where(...).Select(item => item.Id).ToList();
    
        var problematicIds = new List<ErrorType>();
    
        // Either allow the TaskParallel library to select what it considers
        // as the optimum degree of parallelism by omitting the 
        // ParallelOptions parameter, or specify what you want.
        Parallel.ForEach(ids, new ParallelOptions {MaxDegreeOfParallelism = 8},
                            id => CalculateDetails(id, problematicIds));
    }
    
  • 执行 CalculateDetails 方法并重试死锁失败

    private static void CalculateDetails(int id, List<ErrorType> problematicIds)
    {
        try
        {
            // Handle deadlocks
            DeadlockRetryHelper.Execute(() => CalculateDetails(id));
        }
        catch (Exception e)
        {
            // Too many deadlock retries (or other exception). 
            // Record so we can diagnose problem or retry later
            problematicIds.Add(new ErrorType(id, e));
        }
    }
    
  • 核心的CalculateDetails方法

    private static void CalculateDetails(int id)
    {
        // Creating a new DeviceContext is not expensive.
        // No need to create outside of this method.
        using (var dc = new TestDataContext())
        {
            // TODO: adjust IsolationLevel to minimize deadlocks
            // If you don't need to change the isolation level 
            // then you can remove the TransactionScope altogether
            using (var scope = new TransactionScope(
                TransactionScopeOption.Required,
                new TransactionOptions {IsolationLevel = IsolationLevel.Serializable}))
            {
                TestItem item = dc.TestItems.Single(i => i.Id == id);
    
                // work done here
    
                dc.SubmitChanges();
                scope.Complete();
            }
        }
    }
    
  • 当然还有我对死锁重试助手的实现

    public static class DeadlockRetryHelper
    {
        private const int MaxRetries = 4;
        private const int SqlDeadlock = 1205;
    
        public static void Execute(Action action, int maxRetries = MaxRetries)
        {
            if (HasAmbientTransaction())
            {
                // Deadlock blows out containing transaction
                // so no point retrying if already in tx.
                action();
            }
    
            int retries = 0;
    
            while (retries < maxRetries)
            {
                try
                {
                    action();
                    return;
                }
                catch (Exception e)
                {
                    if (IsSqlDeadlock(e))
                    {
                        retries++;
                        // Delay subsequent retries - not sure if this helps or not
                        Thread.Sleep(100 * retries);
                    }
                    else
                    {
                        throw;
                    }
                }
            }
    
            action();
        }
    
        private static bool HasAmbientTransaction()
        {
            return Transaction.Current != null;
        }
    
        private static bool IsSqlDeadlock(Exception exception)
        {
            if (exception == null)
            {
                return false;
            }
    
            var sqlException = exception as SqlException;
    
            if (sqlException != null && sqlException.Number == SqlDeadlock)
            {
                return true;
            }
    
            if (exception.InnerException != null)
            {
                return IsSqlDeadlock(exception.InnerException);
            }
    
            return false;
        }
    }
    
  • 另一种可能性是使用分区策略

如果您的表可以自然地划分为几个不同的数据集,那么您可以使用 SQL Server partitioned tables and indexes ,或者你可以 manually split您现有的表格分成几组表格。我建议使用 SQL Server 的分区,因为第二个选项会很困惑。此外,内置分区仅在 SQL Enterprise Edition 上可用。

如果分区对您来说是可能的,您可以选择一个分区方案,将您的数据分成 8 个不同的集合。现在您可以使用原来的单线程代码,但有 8 个线程,每个线程针对一个单独的分区。现在不会有任何(或至少最少数量的)死锁。

我希望这是有道理的。

关于c# - 具有 SQL Server 数据库调用的多线程 C# 应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9952137/

相关文章:

c# - MVVM 从 VM 访问 Richtextbox 流文档

c# - 适用于 Windows Phone 8.1 Silverlight 的数据库

c# - WPF:程序集解析器

sqlite,当表达到最大记录数时触发

php - $_PHP_SELF常量

sql-server - SQL Server 2008 R2 上的 SET IDENTITY_INSERT 出现奇怪错误(消息 8107)

c# - UTF-8 字节标记检查根据操作系统给出不同的值

sql - 如果任何字段匹配,如何对 postgres 表的结果进行分组?

sql - 从表列中的 SQL Server 日期值获取月份名称

sql-server - 复制 SQL Azure 数据仓库的架构?