我有一个 SQL Server 数据库,表 main
中有 500,000 条记录。还有其他三个表,分别称为 child1
、child2
和 child3
。 child1
、child2
、child3
和main
之间的多对多关系是通过三个关系表实现的: main_child1_relationship
、main_child2_relationship
和 main_child3_relationship
。我需要读取 main
中的记录,更新 main
,并将新行插入到关系表中,并在子表中插入新记录。子表中的记录具有唯一性约束,因此实际计算 (CalculateDetails) 的伪代码类似于:
for each record in main
{
find its child1 like qualities
for each one of its child1 qualities
{
find the record in child1 that matches that quality
if found
{
add a record to main_child1_relationship to connect the two records
}
else
{
create a new record in child1 for the quality mentioned
add a record to main_child1_relationship to connect the two records
}
}
...repeat the above for child2
...repeat the above for child3
}
这作为单线程应用程序运行良好。但它太慢了。 C# 中的处理任务繁重且耗时太长。我想把它变成一个多线程应用程序。
最好的方法是什么?我们正在使用 Linq to Sql。
到目前为止,我的方法是为来自 main
的每批记录创建一个新的 DataContext
对象,并使用 ThreadPool.QueueUserWorkItem
来处理它。然而,这些批处理相互踩着对方的脚趾,因为一个线程添加了一条记录,然后下一个线程尝试添加相同的记录,然后......我遇到了各种有趣的 SQL Server 死锁。
代码如下:
int skip = 0;
List<int> thisBatch;
Queue<List<int>> allBatches = new Queue<List<int>>();
do
{
thisBatch = allIds
.Skip(skip)
.Take(numberOfRecordsToPullFromDBAtATime).ToList();
allBatches.Enqueue(thisBatch);
skip += numberOfRecordsToPullFromDBAtATime;
} while (thisBatch.Count() > 0);
while (allBatches.Count() > 0)
{
RRDataContext rrdc = new RRDataContext();
var currentBatch = allBatches.Dequeue();
lock (locker)
{
runningTasks++;
}
System.Threading.ThreadPool.QueueUserWorkItem(x =>
ProcessBatch(currentBatch, rrdc));
lock (locker)
{
while (runningTasks > MAX_NUMBER_OF_THREADS)
{
Monitor.Wait(locker);
UpdateGUI();
}
}
}
这里是 ProcessBatch:
private static void ProcessBatch(
List<int> currentBatch, RRDataContext rrdc)
{
var topRecords = GetTopRecords(rrdc, currentBatch);
CalculateDetails(rrdc, topRecords);
rrdc.Dispose();
lock (locker)
{
runningTasks--;
Monitor.Pulse(locker);
};
}
和
private static List<Record> GetTopRecords(RecipeRelationshipsDataContext rrdc,
List<int> thisBatch)
{
List<Record> topRecords;
topRecords = rrdc.Records
.Where(x => thisBatch.Contains(x.Id))
.OrderBy(x => x.OrderByMe).ToList();
return topRecords;
}
CalculateDetails
最好用顶部的伪代码来解释。
我认为一定有更好的方法来做到这一点。请帮忙。非常感谢!
最佳答案
这是我对这个问题的看法:
当使用多个线程在 SQL Server 或任何数据库中插入/更新/查询数据时,死锁是不可避免的。您必须假设它们会发生并适本地处理它们。
这并不是说我们不应该尝试限制死锁的发生。但是,很容易了解 deadlocks 的基本原因。并采取措施防止它们发生,但 SQL Server 总是会让您大吃一惊:-)
一些死锁的原因:
线程太多 - 尝试将线程数限制在最低限度,但当然我们需要更多线程以获得最佳性能。
没有足够的索引。如果选择和更新没有足够的选择性,SQL 将取出比健康范围更大的锁。尝试指定适当的索引。
索引过多。更新索引会导致死锁,因此请尽量将索引减少到所需的最低限度。
事务隔离级别太高。默认 isolation level使用 .NET 时是“Serializable”,而默认使用 SQL Server 时是“Read Committed”。降低隔离级别会有很大帮助(当然如果合适的话)。
这就是我可能会如何解决您的问题:
我不会推出自己的线程解决方案,我会使用 TaskParallel 库。我的主要方法看起来像这样:
using (var dc = new TestDataContext()) { // Get all the ids of interest. // I assume you mark successfully updated rows in some way // in the update transaction. List<int> ids = dc.TestItems.Where(...).Select(item => item.Id).ToList(); var problematicIds = new List<ErrorType>(); // Either allow the TaskParallel library to select what it considers // as the optimum degree of parallelism by omitting the // ParallelOptions parameter, or specify what you want. Parallel.ForEach(ids, new ParallelOptions {MaxDegreeOfParallelism = 8}, id => CalculateDetails(id, problematicIds)); }
执行 CalculateDetails 方法并重试死锁失败
private static void CalculateDetails(int id, List<ErrorType> problematicIds) { try { // Handle deadlocks DeadlockRetryHelper.Execute(() => CalculateDetails(id)); } catch (Exception e) { // Too many deadlock retries (or other exception). // Record so we can diagnose problem or retry later problematicIds.Add(new ErrorType(id, e)); } }
核心的CalculateDetails方法
private static void CalculateDetails(int id) { // Creating a new DeviceContext is not expensive. // No need to create outside of this method. using (var dc = new TestDataContext()) { // TODO: adjust IsolationLevel to minimize deadlocks // If you don't need to change the isolation level // then you can remove the TransactionScope altogether using (var scope = new TransactionScope( TransactionScopeOption.Required, new TransactionOptions {IsolationLevel = IsolationLevel.Serializable})) { TestItem item = dc.TestItems.Single(i => i.Id == id); // work done here dc.SubmitChanges(); scope.Complete(); } } }
当然还有我对死锁重试助手的实现
public static class DeadlockRetryHelper { private const int MaxRetries = 4; private const int SqlDeadlock = 1205; public static void Execute(Action action, int maxRetries = MaxRetries) { if (HasAmbientTransaction()) { // Deadlock blows out containing transaction // so no point retrying if already in tx. action(); } int retries = 0; while (retries < maxRetries) { try { action(); return; } catch (Exception e) { if (IsSqlDeadlock(e)) { retries++; // Delay subsequent retries - not sure if this helps or not Thread.Sleep(100 * retries); } else { throw; } } } action(); } private static bool HasAmbientTransaction() { return Transaction.Current != null; } private static bool IsSqlDeadlock(Exception exception) { if (exception == null) { return false; } var sqlException = exception as SqlException; if (sqlException != null && sqlException.Number == SqlDeadlock) { return true; } if (exception.InnerException != null) { return IsSqlDeadlock(exception.InnerException); } return false; } }
另一种可能性是使用分区策略
如果您的表可以自然地划分为几个不同的数据集,那么您可以使用 SQL Server partitioned tables and indexes ,或者你可以 manually split您现有的表格分成几组表格。我建议使用 SQL Server 的分区,因为第二个选项会很困惑。此外,内置分区仅在 SQL Enterprise Edition 上可用。
如果分区对您来说是可能的,您可以选择一个分区方案,将您的数据分成 8 个不同的集合。现在您可以使用原来的单线程代码,但有 8 个线程,每个线程针对一个单独的分区。现在不会有任何(或至少最少数量的)死锁。
我希望这是有道理的。
关于c# - 具有 SQL Server 数据库调用的多线程 C# 应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9952137/