c# - 从不同线程读取和写入相同的内存

标签 c# .net multithreading async-await thread-synchronization

我有一个简单的类,它异步发送请求。

public class MyClass
{
    private readonly ISender _sender;

    public MyClass(ISender sender)
    {
        _sender = sender;
    }

    public Task<string> SendAsync(string input, CancellationToken cancellationToken)
    {
        return _sender.SendAsync(input, cancellationToken);
    }
}

public interface ISender
{
    Task<string> SendAsync(string input, CancellationToken cancellationToken);
}

一切看起来都很简单,直到满足以下要求:_sender 可以在运行时更改。

MyClass 的新实现:

public class MyClass
{
    private readonly ISender _sender;

    public MyClass(ISender sender)
    {
        _sender = sender;
    }

    public Task<string> SendAsync(string input, CancellationToken cancellationToken)
    {
        return _sender.SendAsync(input, cancellationToken);
    }

    public void SenderChanged(object unused, SenderEventArgs e)
    {
        ISender previous = Interlocked.Exchange(ref _sender, SenderFactory.Create(e.NewSenderConfig));
        previous.Dispose();
    }
}

显然这段代码不是线程安全的。我需要在 SendAsyncSenderChanged 中引入 lock 以确保 _sender 始终是最新的对象。 但我希望每天调用 SenderChanged 一次,并且 SendAsync(读取 _sender 对象)每秒调用 10000 次。
锁定和上下文切换会降低这段代码的性能。

有没有可能通过低级锁定来处理这个问题?或者知道上述要求后你会如何解决这个问题?

最佳答案

通常的方法是使用读写锁,特别是 ReaderWriterLockSlim 。这是一个类似监视器的锁,可针对频繁的读取访问和不频繁的写入访问进行优化,并且它支持多个并发读取器和单个写入器,这似乎正是您的用例。

但是,它的成本似乎适中。我编写了两个测试 - 一个使用 ReaderWriterLockSlim 来正确执行操作,另一个使用您的实现,唯一的更改是已处理异常重试循环。就我而言,我更换了发件人 20 次,每 10 秒更换一次。这比您建议的用例要短得多,但确实可以作为性能差异的估计。

最后:

  • 读取器/写入器锁每毫秒能够通过 2878 个工作单元。
  • 经过重试的裸机能够达到每毫秒 9940 个工作单位。

其中“工作单元”调用 DoWork 方法,该方法调用 Thread.SpinWait(100)。如果您想自己测试一下,下面发布了代码。

编辑:

我调整了 Thread.SpinWait() 调用,以改变锁定与“工作”所花费的时间之间的平衡。在我的机器上,旋转等待时间约为 900-1000,两种实现的运行速度相同,约为 1000 个工作单元/毫秒。从上面的结果来看,这一点应该是显而易见的,但我确实只想运行健全性检查。

事实上,原始结果表明我们能够使用锁每秒处理大约 280 万个请求;至少在我的机器上是 4 核 Intel CPU,“Intel Core 2 Quad CPU Q9650 @ 3.00 GHz”。鉴于您正在争取每秒 10k 请求,在锁定开始成为 CPU 使用率的重要比例之前,您似乎已经有了大约一个数量级的空间。


#define USE_READERWRITER

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;

namespace TestProject
{
    static class Program
    {
        /// <summary>
        /// The main entry point for the application.
        /// </summary>
        [STAThread]
        static void Main()
        {
            SenderDispatch dispatch = new SenderDispatch();

            List<Worker> workers = new List<Worker>();

            workers.Add( new Worker( dispatch, "A" ) );
            workers.Add( new Worker( dispatch, "B" ) );
            workers.Add( new Worker( dispatch, "C" ) );
            workers.Add( new Worker( dispatch, "D" ) );

            Thread.CurrentThread.Name = "Main thread";
            Process.GetCurrentProcess().PriorityClass = ProcessPriorityClass.High;

            Stopwatch watch = new Stopwatch();

            watch.Start();
            workers.ForEach( x => x.Start() );

            for( int i = 0; i < 20; i++ )
            {
                Thread.Sleep( 10000 );
                dispatch.NewSender();
            }

            Console.WriteLine( "Stopping..." );

            workers.ForEach( x => x.Stop() );
            watch.Stop();

            Console.WriteLine( "Stopped" );

            long sum = workers.Sum( x => x.FinalCount );

            string message = 
                "Sum of worker iterations: " + sum.ToString( "n0" ) + "\r\n" +
                "Total time:               " + ( watch.ElapsedMilliseconds / 1000.0 ).ToString( "0.000" ) + "\r\n" +
                "Iterations/ms:            " + sum / watch.ElapsedMilliseconds;

            MessageBox.Show( message );
        }
    }

    public class Worker
    {
        private SenderDispatch dispatcher;
        private Thread thread;
        private bool working;

        private string workerName;

        public Worker( SenderDispatch dispatcher, string workerName )
        {
            this.dispatcher = dispatcher;
            this.workerName = workerName;

            this.working = false;
        }

        public long FinalCount { get; private set; }

        public void Start()
        {
            this.thread = new Thread( Run );
            this.thread.Name = "Worker " + this.workerName;

            this.working = true;
            this.thread.Start();
        }

        private void Run()
        {
            long state = 0;

            while( this.working )
            {
                this.dispatcher.DoOperation( workerName, state );
                state++;
            }

            this.FinalCount = state;
        }

        public void Stop()
        {
            this.working = false;

            this.thread.Join();
        }
    }

    public class SenderDispatch
    {
        private Sender sender;

        private ReaderWriterLockSlim senderLock;

        public SenderDispatch()
        {
            this.sender = new Sender();
            this.senderLock = new ReaderWriterLockSlim( LockRecursionPolicy.NoRecursion );
        }

        public void DoOperation( string workerName, long value )
        {

#if USE_READERWRITER
            this.senderLock.EnterReadLock();
            try
            {
                this.sender.DoOperation( workerName, value );
            }
            finally
            {
                this.senderLock.ExitReadLock();
            }
#else 
            bool done = false;

            do
            {
                try
                {
                    this.sender.DoOperation( workerName, value );
                    done = true;
                }
                catch (ObjectDisposedException) { }
            }
            while( !done );
#endif

        }

        public void NewSender()
        {
            Sender prevSender;
            Sender newSender;

            newSender = new Sender();

#if USE_READERWRITER
            this.senderLock.EnterWriteLock();
            try
            {
                prevSender = Interlocked.Exchange( ref this.sender, newSender );
            }
            finally
            {
                this.senderLock.ExitWriteLock();
            }
#else
            prevSender = Interlocked.Exchange( ref this.sender, newSender );
            prevSender.Dispose();

#endif
            prevSender.Dispose();

        }
    }

    public class Sender : IDisposable
    {
        private bool disposed;

        public Sender()
        {
            this.disposed = false;
        }

        public void DoOperation( string workerName, long value )
        {
            if( this.disposed )
            {
                throw new ObjectDisposedException( 
                    "Sender",
                    string.Format( "Worker {0} tried to queue work item {1}", workerName, value ) 
                );
            }

            Thread.SpinWait( 100 );
        }

        public void Dispose()
        {
            this.disposed = true;
        }
    }
}

关于c# - 从不同线程读取和写入相同的内存,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29519563/

相关文章:

.net - 什么可能导致空白 XML 文件?

c# - 为什么我会收到 System.OutOfMemoryException

C#/Mysql 测验仅 5 个问题

c# - 在 Entity Framework Core 中编写实体 POCO 类的正确方法是什么?

.net - 获取序列的基础类型(数组、列表、seq)

java - Java 同步块(synchronized block)/代码上的线程访问

c# - 如何创建新的 Windows 窗体,并将其与已存在的线程关联?

java - 使用可运行类

c# - 重定向用于引导的命令行参数

c# - mvc3 Razor 中的复选框列表示例,