.net - c# 全双工异步命名管道 .NET

标签 .net asynchronous named-pipes

我正在尝试在 2 台不同的机器(仅)上实现全双工客户端-服务器通信方案,其中每个端点(客户端或服务器)可以随时异步发送内容(非阻塞管道),并且另一端会拿起它并阅读它。
(请不要通过引用命名管道以外的其他技术来回答;我需要使用这种特殊方法的答案。)
我读过命名管道必须是单向的,否则它们会锁定,但我猜这可能是错误的。我认为管道是基于套接字的,我无法想象底层套接字只是单向的。
这个问题的任何答案都需要解决这些问题才能真正有用:

  • 答案需要解决异步管道,我不能使用同步解决方案。
  • 答案需要证明或允许管道保持打开的事实。我厌倦了阅读打开管道、传输字符串、然后立即关闭管道的示例。我想要一个假设管道保持打开并在随机时间传输大量垃圾并不断重复的答案。没有挂起。
  • 基于C#的解决方案

  • 我很抱歉听起来苛刻和流鼻涕,但经过几天的互联网搜索,我仍然没有找到一个好的例子,我不想使用 WCF。如果你知道这个答案的细节并且回答得很好,我敢肯定,这个话题将成为 future 时代的真正赢家。如果我弄清楚了,我会自己发布答案。
    如果您要写并说“您需要使用两个管道”,请解释原因,以及您如何知道这是真的,因为我读过的任何内容都没有解释为什么会这样。
    谢谢!

    最佳答案

    您不必使用两个管道。我在网上找到了很多答案,说明您需要使用两个管道。我四处寻找,熬夜,试了又试,然后想出了怎么做, super 简单,但你必须把所有事情都做好(尤其是把事情按照正确的调用顺序),否则它就行不通了.另一个技巧是始终确保您有一个未完成的读取调用,否则它也会被锁定。在你知道有人在读之前不要写。除非您先设置了事件,否则不要开始读取调用。那种事。

    这是我正在使用的管道类。它可能不够健壮,无法处理管道错误、关闭和溢出。

    好的,我不知道这里出了什么问题,但是格式有点不对!
    vvvv

    namespace Squall
    {
        public interface PipeSender
        {
            Task SendCommandAsync(PipeCommandPlusString pCmd);
        }
    
        /******************************************************************************
         * 
         * 
         * 
         * 
         ******************************************************************************/
        public class ClientPipe : BasicPipe
        {
            NamedPipeClientStream m_pPipe;
    
            public ClientPipe(string szServerName, string szPipeName)
                : base("Client")
            {
                m_szPipeName = szPipeName; // debugging
                m_pPipe = new NamedPipeClientStream(szServerName, szPipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
                base.SetPipeStream(m_pPipe); // inform base class what to read/write from
            }
    
            public void Connect()
            {
                Debug.WriteLine("Pipe " + FullPipeNameDebug() + " connecting to server");
                m_pPipe.Connect(); // doesn't seem to be an async method for this routine. just a timeout.
                StartReadingAsync();
            }
    
            // the client's pipe index is always 0
            internal override int PipeId() { return 0; }
        }
    
        /******************************************************************************
         * 
         * 
         * 
         * 
         ******************************************************************************/
        public class ServerPipe : BasicPipe
        {
            public event EventHandler<EventArgs> GotConnectionEvent;
    
            NamedPipeServerStream m_pPipe;
            int m_nPipeId;
    
            public ServerPipe(string szPipeName, int nPipeId)
                : base("Server")
            {
                m_szPipeName = szPipeName;
                m_nPipeId = nPipeId;
                m_pPipe = new NamedPipeServerStream(
                    szPipeName,
                    PipeDirection.InOut,
                    NamedPipeServerStream.MaxAllowedServerInstances,
                    PipeTransmissionMode.Message,
                    PipeOptions.Asynchronous);
                base.SetPipeStream(m_pPipe);
                m_pPipe.BeginWaitForConnection(new AsyncCallback(StaticGotPipeConnection), this);
            }
    
            static void StaticGotPipeConnection(IAsyncResult pAsyncResult)
            {
                ServerPipe pThis = pAsyncResult.AsyncState as ServerPipe;
                pThis.GotPipeConnection(pAsyncResult);
            }
    
            void GotPipeConnection(IAsyncResult pAsyncResult)
            {
                m_pPipe.EndWaitForConnection(pAsyncResult);
    
                Debug.WriteLine("Server Pipe " + m_szPipeName + " got a connection");
    
                if (GotConnectionEvent != null)
                {
                    GotConnectionEvent(this, new EventArgs());
                }
    
                // lodge the first read request to get us going
                //
                StartReadingAsync();
            }
    
            internal override int PipeId() { return m_nPipeId; }
        }
    
        /******************************************************************************
         * 
         * 
         * 
         * 
         ******************************************************************************/
    
        public abstract class BasicPipe : PipeSender
        {
            public static int MaxLen = 1024 * 1024; // why not
            protected string m_szPipeName;
            protected string m_szDebugPipeName;
    
            public event EventHandler<PipeEventArgs> ReadDataEvent;
            public event EventHandler<EventArgs> PipeClosedEvent;
    
            protected byte[] m_pPipeBuffer = new byte[BasicPipe.MaxLen];
    
            PipeStream m_pPipeStream;
    
            public BasicPipe(string szDebugPipeName)
            {
                m_szDebugPipeName = szDebugPipeName;
            }
    
            protected void SetPipeStream(PipeStream p)
            {
                m_pPipeStream = p;
            }
    
            protected string FullPipeNameDebug()
            {
                return m_szDebugPipeName + "-" + m_szPipeName;
            }
    
            internal abstract int PipeId();
    
            public void Close()
            {
                m_pPipeStream.WaitForPipeDrain();
                m_pPipeStream.Close();
                m_pPipeStream.Dispose();
                m_pPipeStream = null;
            }
    
            // called when Server pipe gets a connection, or when Client pipe is created
            public void StartReadingAsync()
            {
                Debug.WriteLine("Pipe " + FullPipeNameDebug() + " calling ReadAsync");
    
                // okay we're connected, now immediately listen for incoming buffers
                //
                byte[] pBuffer = new byte[MaxLen];
                m_pPipeStream.ReadAsync(pBuffer, 0, MaxLen).ContinueWith(t =>
                {
                    Debug.WriteLine("Pipe " + FullPipeNameDebug() + " finished a read request");
    
                    int ReadLen = t.Result;
                    if (ReadLen == 0)
                    {
                        Debug.WriteLine("Got a null read length, remote pipe was closed");
                        if (PipeClosedEvent != null)
                        {
                            PipeClosedEvent(this, new EventArgs());
                        }
                        return;
                    }
    
                    if (ReadDataEvent != null)
                    {
                        ReadDataEvent(this, new PipeEventArgs(pBuffer, ReadLen));
                    }
                    else
                    {
                        Debug.Assert(false, "something happened");
                    }
    
                    // lodge ANOTHER read request
                    //
                    StartReadingAsync();
    
                });
            }
    
            protected Task WriteByteArray(byte[] pBytes)
            {
                // this will start writing, but does it copy the memory before returning?
                return m_pPipeStream.WriteAsync(pBytes, 0, pBytes.Length);
            }
    
            public Task SendCommandAsync(PipeCommandPlusString pCmd)
            {
                Debug.WriteLine("Pipe " + FullPipeNameDebug() + ", writing " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString());
                string szSerializedCmd = JsonConvert.SerializeObject(pCmd);
                byte[] pSerializedCmd = Misc.StringToBytes(szSerializedCmd);
                Task t = WriteByteArray(pSerializedCmd);
                return t;
            }
        }
    
        /******************************************************************************
         * 
         * 
         * 
         * 
         ******************************************************************************/
    
        public class PipeEventArgs
        {
            public byte[] m_pData;
            public int m_nDataLen;
    
            public PipeEventArgs(byte[] pData, int nDataLen)
            {
                // is this a copy, or an alias copy? I can't remember right now.
                m_pData = pData;
                m_nDataLen = nDataLen;
            }
        }
    
        /******************************************************************************
         * if we're just going to send a string back and forth, then we can use this
         * class. It it allows us to get the bytes as a string. sort of silly.
         ******************************************************************************/
    
        [Serializable]
        public class PipeCommandPlusString
        {
            public string m_szCommand;  // must be public to be serialized
            public string m_szString;   // ditto
    
            public PipeCommandPlusString(string sz, string szString)
            {
                m_szCommand = sz;
                m_szString = szString;
            }
    
            public string GetCommand()
            {
                return m_szCommand;
            }
    
            public string GetTransmittedString()
            {
                return m_szString;
            }
        }
    }
    

    这是我的管道测试,在一个进程上运行。它也运行在两个进程上,我检查了
    namespace NamedPipeTest
    {
        public partial class Form1 : Form
        {
            SynchronizationContext _context;
            Thread m_pThread = null;
            volatile bool m_bDieThreadDie;
            ServerPipe m_pServerPipe;
            ClientPipe m_pClientPipe;
    
            public Form1()
            {
                InitializeComponent();
            }
    
            private void Form1_Load(object sender, EventArgs e)
            {
                _context = SynchronizationContext.Current;
    
                m_pServerPipe = new ServerPipe("SQUALL_PIPE", 0);
                m_pServerPipe.ReadDataEvent += M_pServerPipe_ReadDataEvent;
                m_pServerPipe.PipeClosedEvent += M_pServerPipe_PipeClosedEvent;
    
                // m_pThread = new Thread(StaticThreadProc);
                // m_pThread.Start( this );
            }
    
            private void M_pServerPipe_PipeClosedEvent(object sender, EventArgs e)
            {
                Debug.WriteLine("Server: Pipe was closed, shutting down");
    
                // have to post this on the main thread
                _context.Post(delegate
                {
                    Close();
                }, null);
            }
    
            private void M_pServerPipe_ReadDataEvent(object sender, PipeEventArgs e)
            {
                // this gets called on an anonymous thread
    
                byte[] pBytes = e.m_pData;
                string szBytes = Misc.BytesToString(pBytes, e.m_pData.Length);
                PipeCommandPlusString pCmd = JsonConvert.DeserializeObject<PipeCommandPlusString>(szBytes);
                string szValue = pCmd.GetTransmittedString();
    
                if (szValue == "CONNECT")
                {
                    Debug.WriteLine("Got command from client: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString() + ", writing command back to client");
                    PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("SERVER", "CONNECTED");
                    // fire off an async write
                    Task t = m_pServerPipe.SendCommandAsync(pCmdToSend);
                }
            }
    
            static void StaticThreadProc(Object o)
            {
                Form1 pThis = o as Form1;
                pThis.ThreadProc();
            }
    
            void ThreadProc()
            {
                m_pClientPipe = new ClientPipe(".", "SQUALL_PIPE");
                m_pClientPipe.ReadDataEvent += PClientPipe_ReadDataEvent;
                m_pClientPipe.PipeClosedEvent += M_pClientPipe_PipeClosedEvent;
                m_pClientPipe.Connect();
    
                PipeCommandPlusString pCmd = new PipeCommandPlusString("CLIENT", "CONNECT");
                int Counter = 1;
                while (Counter++ < 10)
                {
                    Debug.WriteLine("Counter = " + Counter);
                    m_pClientPipe.SendCommandAsync(pCmd);
                    Thread.Sleep(3000);
                }
    
                while (!m_bDieThreadDie)
                {
                    Thread.Sleep(1000);
                }
    
                m_pClientPipe.ReadDataEvent -= PClientPipe_ReadDataEvent;
                m_pClientPipe.PipeClosedEvent -= M_pClientPipe_PipeClosedEvent;
                m_pClientPipe.Close();
                m_pClientPipe = null;
            }
    
            private void M_pClientPipe_PipeClosedEvent(object sender, EventArgs e)
            {
                // wait around for server to shut us down
            }
    
            private void PClientPipe_ReadDataEvent(object sender, PipeEventArgs e)
            {
                byte[] pBytes = e.m_pData;
                string szBytes = Misc.BytesToString(pBytes, e.m_nDataLen);
                PipeCommandPlusString pCmd = JsonConvert.DeserializeObject<PipeCommandPlusString>(szBytes);
                string szValue = pCmd.GetTransmittedString();
    
                Debug.WriteLine("Got command from server: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString());
    
                if (szValue == "CONNECTED")
                {
                    PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("CLIENT", "DATA");
                    m_pClientPipe.SendCommandAsync(pCmdToSend);
                }
            }
    
            private void Form1_FormClosing(object sender, FormClosingEventArgs e)
            {
                if (m_pThread != null)
                {
                    m_bDieThreadDie = true;
                    m_pThread.Join();
                    m_bDieThreadDie = false;
                }
    
                m_pServerPipe.ReadDataEvent -= M_pServerPipe_ReadDataEvent;
                m_pServerPipe.PipeClosedEvent -= M_pServerPipe_PipeClosedEvent;
                m_pServerPipe.Close();
                m_pServerPipe = null;
    
            }
        }
    }
    

    关于.net - c# 全双工异步命名管道 .NET,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34478513/

    相关文章:

    angular - 在带有异步管道的 *ngFor 中使用带有 http-call 的函数时无限循环

    c - 如何在命名管道 (mkfifo) 上执行非阻塞 fopen?

    c# - 我如何在 namedPipeWrapper 中在服务器和客户端之间推送消息?

    c# - 从 switch block 中跳出 foreach 循环

    .net - 在 ASP.NET 中创建大字符串时出现 OutOfMemoryException

    .net - WCF异步回调

    linux - 从管道获取命令后进程退出

    .net - 如何检索 DateTimePicker 的下拉状态?

    c# - 什么 .NET 集合提供最快的搜索

    java - 使用异步 http 提高吞吐量