c# - 使用 SharedMemory MMF 实现快速 .NET 无锁进程间

标签 c# multithreading performance ipc wait-free

我是 多任务处理IPC 的新手,我正在尝试构建一种使用共享内存进行快速进程间通信的方法(起初我正在研究 IPC 术语,记住 wcf socketsnamed pipes 只是为了最终发现 MMF)。

现在我已经通过使用 LockEventWaitHandle 信号在两个进程之间使用共享内存成功地实现了一个小测试,我正在寻找一种实现非阻塞的方法/无需等待模式。现在,我正在尝试结合 Thread.MemoryBarrier() 并从 MemoryMapedFile 读取一个signalling Sector

问题不明!第一轮通过,第二轮最后位于百慕大三角……超出调试器的范围……

假设 进程 a 正在向 进程 b 发送对 a showMsg() 的突发请求。

                                         //offset positions in mmf
MemoryMappedViewAccessor MmfAcc; const int opReady= 0, opCompleteRead = 4, .....



ReadTrd()
{
  //[0,3] - Reader is stationed
  //[4,7] - Read Complete successfully
  //[8,11] - Data-size 
  //[12,15] - Reader-exiting
  "format" the signals Section (write zeroes). 

   for(;;){if (WrTrd-StepMMF1  Confimed) break;}
  MmfAcc- read DataSize val @offset[8]
  MmfAcc- read Data val @offset[50]
  MmfAcc Write exit to offset....
  ....heavy use of  Thread.MemoryBarrier(); sets !!! (all over the place, on every shared variable...)

}

writeTrd()
{
  heavy use of  Thread.MemoryBarrier() !!!
  //[15-19] - wr is stationed
  //[20-23] - wr Complete successfully
  //[24-27] - wrExiting
  "format" the signals Section . 
   for(;;){if Reader-StepMMF1 is Confim break;}
   MmfAcc- DataSize to offset[8]
   write Data To offset[50] using the method below
   for(;;){if Read StepMMF2 is Confim break;}
 } 

由于我是第一次使用命名管道解决方案,因此与命名管道方法相比,Mmf 方法(尽管有 Lock 和 EventWaitHandle)获得了巨大的性能提升,但我什至以某种方式进一步使用上述方法..?

我可以像 strip 化 Raid 一样克隆这种模式 ...

Reader1 + Reader2 & WriteThred1  + WriteThread2

所以我尝试了一下,然后就卡住了。

这种使用全内存栅栏和共享内存进行信号传输的方法是否有效?

如果是这样,剩下的就是看第二次迭代失败的原因,性能差异。

编辑 - 添加额外线程测试背后的逻辑

这是我用来操纵编写器线程的“桥”(读者使用相同的方法。

public void Write(byte[] parCurData)
{
    if (ReadPosition < 0 || WritePosition < 0)
        throw new ArgumentException();
    this.statusSet.Add("ReadWrite:-> " + ReadPosition + "-" + WritePosition);
    // var s = (FsMomitorIPCCrier)data;

    ////////lock (this.dataToSend)
    ////////{
    Thread.MemoryBarrier();
        LiveDataCount_CurIndex = dataQue.Where(i => i != null).Count();
    this.dataQue[LiveDataCount_CurIndex] = parCurData;

    Console.WriteLine("^^^^^" + Thread.CurrentThread.Name + " has Entered WritingThreads BRIDGE");
    Console.WriteLine("^^^^^[transactionsQue] = {1}{0}^^^^^[dataQue.LiveDataASIndex = {2}{0}^^^^^[Current Requests Count = {3}{0}", "\r\n", Wtransactions, LiveDataCount_CurIndex, ++dataDelReqCount);

    //this.itsTimeForWTrd2 = false;

    if (Wtransactions != 0 && Wtransactions > ThrededSafeQ_Initial_Capcity - 1)
    if (this.dataQueISFluded) this.DataQXpand();


    if (itsTimeForWTrd2)
    {

        bool firstWt = true;
        while (writerThread2Running)
        {
            if (!firstWt) continue;
            Console.WriteLine("SECOND WRITERThread [2] is In The CoffeeCorner");
                    firstWt=false;
        }

        this.dataDelivery2 = this.dataQue[LiveDataCount_CurIndex];
        Console.WriteLine("Activating SECOND WRITERThread [2]");
        itsTimeForWTrd2 = false;

        writerThread2Running = true;
        //writerThread1Running = true;
        writerThread2 = new System.Threading.Thread(WriterThread2);
        writerThread2.IsBackground = true;
        writerThread2.Name = this.DepoThreadName + "=[WRITER2]";
        writerThread2.Start();

    }
    else
    {
        bool firstWt = true;
        while (writerThread1Running)
        {
            if (!firstWt)continue;
                Console.WriteLine("WRITERThread [1] is In The CoffeeCorner");
            firstWt=false;
        }
        Console.WriteLine("Activating WRITERThread [1]");
        this.dataDelivery1 = this.dataQue[LiveDataCount_CurIndex]; 

        writerThread1Running = true;
        writerThread1 = new System.Threading.Thread(WriterThread1);
        writerThread1.IsBackground = true;
        writerThread1.Name = this.DepoThreadName+"=[WRITER1]";
        writerThread1.Start();
        itsTimeForWTrd2 = true;

    }
    Thread.MemoryBarrier();
}

使用写句柄读取和写入实际数据(与写入类似的代码)

public unsafe byte[] UsReadBytes(int offset, int num)
{
    byte[] arr = new byte[num];
    byte* ptr = (byte*)0;
    this.accessor.SafeMemoryMappedViewHandle.AcquirePointer(ref ptr);
    Marshal.Copy(IntPtr.Add(new IntPtr(ptr), offset), arr, 0, num);
    this.accessor.SafeMemoryMappedViewHandle.ReleasePointer();
    return arr;
}

正如我所说,我已经通过非阻塞/无等待等研究了数据和共享内存的同步问题。信号量锁所以我试图在每个数据事务处理过程中消除任何类型的开销进入共享内存映射文件。我在这里想问的是,消除 Lock 和 EventWaitHandle 并将其替换为内存栅栏逻辑和通过 mmf 发出信号的问题可能是什么?

最佳答案

如果您计划将其用于研发以外的特定目的,最简单的方法是使用已经提供该功能的库。一种思考方式是,无锁 = 消息传递。两种可能的方法是:对于极简的消息传递实现,ZeroMQ IPC 提供了强大的 .Net 支持和卓越的 IPC 性能(http://www.codeproject.com/Articles/488207/ZeroMQ-via-Csharp-Introduction),对于更完整的 Actor-Model 实现(包括对 IPC 的支持),请查看 Akka .net(http://getakka.net/docs/#networking)。

如果本质上更多的是研发目的(也就是说,您想要编写自己的实现,这很酷),我仍然建议查看这些产品的源代码(尤其是 Akka .net,因为它是用 C# 编写的),用于实现有关消息传递和基于 Actor 的 IPC 的想法。

关于c# - 使用 SharedMemory MMF 实现快速 .NET 无锁进程间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35606986/

相关文章:

Java - 在特定点上同步线程

android - 如何在android中测试执行时间的功能?

c# - 文件资源管理器打开时如何修改目录时间戳?

c# - 无法将 T 转换为间隔(我自己的自定义类型)

c# - 我怎样才能让 Selenium-WebDriver 在发送 key 后等待几秒钟?

android - 第二次运行线程使应用程序崩溃

c# - 静态方法是否共享其局部变量以及在不同线程并发使用期间会发生什么?

html - 有长缓存过期 : need to rename background img files as well as css?

c# - 静态属性/函数的性能

c# - WPF 数据网格和 Tab 键