sql - 分布式事务如何在线程环境中与同一个 DB 的多个连接一起工作?

标签 sql multithreading msdtc

我正在尝试确定分布式事务中多个数据库连接的行为。

我有一个长时间运行的进程,它产生一系列线程,然后每个线程负责管理其数据库连接等。所有这些都在事务范围内运行,每个线程都通过 DependentTransaction 在事务中登记。目的。

当我将这个过程并行执行时,我遇到了一些问题,即似乎存在某种阻止查询在事务上同时执行的块。

我想知道的是事务协调器如何处理从多个连接到同一个数据库的查询,以及是否建议跨线程传递连接对象?

我读过 MS SQL 只允许每个事务有一个连接,但我显然能够在同一事务中创建和初始化多个连接到同一个数据库。在打开连接时,如果没有“另一个 session 正在使用的事务上下文”异常,我根本无法并行执行线程。结果是连接必须等待执行而不是同时运行,最后代码运行完成,但由于这个锁定问题,线程应用程序没有净 yield 。

代码看起来像这样。

    Sub StartThreads()
        Using Scope As New TransactionScope
            Dim TL(100) As Tasks.Task
            Dim dTx As DependentTransaction
            For i As Int32 = 0 To 100
                Dim A(1) As Object
                dTx = CType(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete), DependentTransaction)
                'A(0) = some_other_data
                A(1) = dTx 'the Dependent Transaction

                TL(i) = Tasks.Task.Factory.StartNew(AddressOf Me.ProcessData, A) 'Start the thread and add it to the array
            Next

            Tasks.Task.WaitAll(TL) 'Wait for threads to finish

            Scope.Complete()
        End Using
    End Sub
    Dim TransLock As New Object
    Sub ProcessData(ByVal A As Object)
        Dim DTX As DependentTransaction = A(1)
        Dim Trans As Transactions.TransactionScope
        Dim I As Int32
        Do While True
            Try
                SyncLock (TransLock)
                    Trans = New Transactions.TransactionScope(DTX, TimeSpan.FromMinutes(1))
                End SyncLock
                Exit Do
            Catch ex As TransactionAbortedException
                If ex.ToString.Contains("Failure while attempting to promote transaction") Then
                ElseIf ex.Message = "The transaction has aborted." Then
                    Throw New Exception(ex.ToString)
                    Exit Sub
                End If
                I += 1
                If I > 5 Then
                    Throw New Exception(ex.ToString)
                End If
            Catch ex As Exception

            End Try
            Thread.Sleep(10)
        Loop
        Using Trans
            Using DALS As New DAC.DALScope
                Do While True
                    Try
                        SyncLock (TransLock)
                            'This opens two connection to the same DB for later use.
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.FirstConnection)
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.SecondConnection)
                        End SyncLock
                        Exit Do
                    Catch ex As Exception
                        'This is usually where I find the bottleneck
                        '"Transaction context in use by another session" is the exception that I get
                        Thread.Sleep(100)
                    End Try
                Loop

                '*****************
                'Do some work here
                '*****************

                Trans.Complete()
            End Using
        End Using
        DTX.Complete()
    End Sub

编辑

我的测试最终表明这是不可能的。即使有多个连接或使用同一个连接,事务中的所有请求或问题也会被顺序处理。

也许他们将来会改变这种行为。

最佳答案

首先,您必须将在这里和那里阅读的有关 SQL Server 事务的内容分为两种不同的情况:本地和分布式。

本地 SQL 事务 :

  • SQL Server 只允许对每个本地事务执行一个请求。
  • 默认情况下,只有一个 session 可以注册本地事务。使用 sp_getbindtoken 和 sp_bindsession 可以在本地事务中注册多个 session 。 session 仍然仅限于在任何时候执行一个请求。
  • 使用多个事件结果集 (MARS),一个 session 可以执行多个请求。所有请求都必须注册到同一个本地事务中。

  • 分布式事务 :
  • 多个 session 可以将其本地事务注册到单个分布式事务中。
  • 每个 session 仍然登记在本地事务中,受上述本地事务的所有限制
  • 在分布式事务中注册的本地事务受分布式事务协调的两阶段提交
  • 在分布式事务中注册的实例上的所有本地事务仍然是独立的本地事务,主要意味着它们具有冲突的锁命名空间。

  • 因此,当客户端创建一个 .Net TransactionScope 并且在此事务范围下它在同一服务器上执行多个请求时,这些请求都是在分布式事务中注册的本地事务。一个简单的例子:
    class Program
        {
            static string sqlBatch = @"
    set nocount on;
    declare @i int;
    set @i = 0;
    while @i < 100000
    begin
        insert into test (a) values (replicate('a',100));
        set @i = @i+1;
    end";
    
            static void Main(string[] args)
            {
                try
                {
                    TransactionOptions to = new TransactionOptions();
                    to.IsolationLevel = IsolationLevel.ReadCommitted;
                    using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
                    {
                        using (SqlConnection connA = new SqlConnection(Settings.Default.connString))
                        {
                            connA.Open();
                            using (SqlConnection connB = new SqlConnection(Settings.Default.connString))
                            {
                                connB.Open();
    
                                SqlCommand cmdA = new SqlCommand(sqlBatch, connA);
                                SqlCommand cmdB = new SqlCommand(sqlBatch, connB);
    
                                IAsyncResult arA = cmdA.BeginExecuteNonQuery();
                                IAsyncResult arB = cmdB.BeginExecuteNonQuery();
    
                                WaitHandle.WaitAll(new WaitHandle[] { arA.AsyncWaitHandle, arB.AsyncWaitHandle });
    
                                cmdA.EndExecuteNonQuery(arA);
                                cmdB.EndExecuteNonQuery(arB);
                            }
                        }
                        scp.Complete();
                    }
                }
                catch (Exception e)
                {
                    Console.Error.Write(e);
                }
            }
        }
    

    创建一个虚拟测试表:
    create table test (id int not null identity(1,1) primary key, a varchar(100));
    

    并运行我的示例中的代码。您将看到两个请求并行执行,每个请求都在表中生成 100k 行,然后在事务范围完成时提交。因此,您看到的问题与 SQL Server 和 TransactionScope 无关,它们可以轻松处理您描述的场景。此外,代码非常简单明了,不需要创建依赖事务、进行克隆或提升事务。

    更新

    使用显式线程和相关事务:
     private class ThreadState
        {
            public DependentTransaction Transaction {get; set;}
            public EventWaitHandle Done {get; set;}
            public SqlConnection Connection { get; set; }
        }
        static void Main(string[] args)
        {
            try
            {
                TransactionOptions to = new TransactionOptions();
                to.IsolationLevel = IsolationLevel.ReadCommitted;
                using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
                {
                    ThreadState stateA = new ThreadState 
                    {
                        Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                        Done = new AutoResetEvent(false),
                        Connection = new SqlConnection(Settings.Default.connString),
                    };
                    stateA.Connection.Open();
                    ThreadState stateB = new ThreadState
                    {
                        Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                        Done = new AutoResetEvent(false),
                        Connection = new SqlConnection(Settings.Default.connString),
                    };
                    stateB.Connection.Open();
    
                    ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateA);
                    ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateB);
    
                    WaitHandle.WaitAll(new WaitHandle[] { stateA.Done, stateB.Done });
    
                    scp.Complete();
    
                    //TODO: dispose the open connections
                }
    
            }
            catch (Exception e)
            {
                Console.Error.Write(e);
            }
        }
    
        private static void Worker(object args)
        {
            Debug.Assert(args is ThreadState);
            ThreadState state = (ThreadState) args;
            try
            {
                using (TransactionScope scp = new TransactionScope(state.Transaction))
                {
                    SqlCommand cmd = new SqlCommand(sqlBatch, state.Connection);
                    cmd.ExecuteNonQuery();
                    scp.Complete();
                }
                state.Transaction.Complete();
            }
            catch (Exception e)
            {
                Console.Error.WriteLine(e);
                state.Transaction.Rollback();
            }
            finally
            {
                state.Done.Set();
            }
    
        }
    

    关于sql - 分布式事务如何在线程环境中与同一个 DB 的多个连接一起工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2186698/

    相关文章:

    java - 比例大小无效。不能小于零

    c# - EntityFramework 同表多对多关系

    javascript - Web Worker 被 Chrome 中的主线程阻塞

    .net - 如何确定 SqlConnection 是否被登记到 System.Transactions 的 tx 中?

    asp.net-mvc-4 - 将 WebSercurity.CreateAccount 与 TrasactionScope 中的其他查询一起使用,不启用 DTC

    sql - Rails 3 有多个参数

    PHP/MySql 多个下拉列表不希望选项值在页面源中可见

    c++ - 多线程时如何避免堆栈空间不足?

    java - 没有 volatile 的双重检查锁定

    c# - 如何针对 Azure SQL 数据库使用嵌套 TransactionScope