c# SocketAsyncEventArgs 阻塞 ReceiveAsync 处理程序中的代码

标签 c# multithreading sockets asynchronous async-await

我正在测试以下两种情况,一种有效,但另一种无效。 我有套接字服务器和套接字客户端应用程序在两台不同的机器上运行

这两个场景都使用了 socketasynceventargs

场景 1(有效) 循环创建 40k 套接字客户端,等待所有连接建立,然后所有客户端同时向服务器发送消息并从服务器接收响应 10 次(即发送/接收发生 10 次)。

场景 2(不起作用。我收到很多连接拒绝错误) 在循环中创建 40k 套接字客户端,并在每个客户端连接后立即向服务器发送/接收相同的 10 条消息,而不是等待 40k 连接建立。

我无法弄清楚为什么我的第二种情况会失败。我知道在场景 1 中,服务器在建立所有 40k 连接之前不会做太多事情。但它能够同时与所有客户端通信。有什么想法吗??

感谢您的耐心等待。

这里是套接字服务器代码

public class SocketServer
    {

   private static System.Timers.Timer MonitorTimer = new System.Timers.Timer();
        public static SocketServerMonitor socket_monitor = new SocketServerMonitor();
        private int m_numConnections; 
        private int m_receiveBufferSize;
        public static BufferManager m_bufferManager;  
        Socket listenSocket;           

        public static SocketAsyncEventArgsPool m_readWritePool;
        public static int m_numConnectedSockets;    
        private int cnt = 0;

        public static int Closecalled=0;


        public SocketServer(int numConnections, int receiveBufferSize)
        {
            m_numConnectedSockets = 0;
            m_numConnections = numConnections;
            m_receiveBufferSize = receiveBufferSize;


            m_bufferManager = new BufferManager(receiveBufferSize * numConnections ,
               receiveBufferSize);

            m_readWritePool = new SocketAsyncEventArgsPool(numConnections);

        }


        public void Init()
        {
            MonitorTimer.Interval = 30000;
            MonitorTimer.Start();
            MonitorTimer.Elapsed += new System.Timers.ElapsedEventHandler(socket_monitor.Log);


            m_bufferManager.InitBuffer();


            SocketAsyncEventArgs readWriteEventArg;

            for (int i = 0; i < m_numConnections; i++)
            {

                readWriteEventArg = new SocketAsyncEventArgs();

                m_readWritePool.Push(readWriteEventArg);
            }

        }


        public void Start(IPEndPoint localEndPoint)
        {

            listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
            listenSocket.Bind(localEndPoint);

            listenSocket.Listen(1000);


            StartAccept(null);
        }

        public void Stop()
        {
            if (listenSocket == null)
                return;
            listenSocket.Close();
            listenSocket = null;


            Thread.Sleep(15000);
        }

        private void StartAccept(SocketAsyncEventArgs acceptEventArg)
        {
            if (acceptEventArg == null)
            {
                acceptEventArg = new SocketAsyncEventArgs();
                acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
            }
            else
            {
                // socket must be cleared since the context object is being reused
                acceptEventArg.AcceptSocket = null;
            }

            try
            {
                bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);
                if (!willRaiseEvent)
                {
                    ProcessAccept(acceptEventArg);
                }
            }
            catch (Exception e)
            {

            }
        }


        void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
        {
            ProcessAccept(e);
        }

        private void ProcessAccept(SocketAsyncEventArgs e)
        {
            Interlocked.Increment(ref m_numConnectedSockets);
            socket_monitor.IncSocketsConnected();


            SocketAsyncEventArgs readEventArgs = m_readWritePool.Pop();
            m_bufferManager.SetBuffer(readEventArgs);

            readEventArgs.UserToken = new AsyncUserToken { id = cnt++, StarTime = DateTime.Now };
            readEventArgs.AcceptSocket = e.AcceptSocket;
            SocketHandler handler=new SocketHandler(readEventArgs);

            StartAccept(e);
        }

    }   






class SocketHandler
    {
        private SocketAsyncEventArgs _socketEventArgs;

        public SocketHandler(SocketAsyncEventArgs socketAsyncEventArgs)
        {
            _socketEventArgs = socketAsyncEventArgs;
            _socketEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
            StartReceive(_socketEventArgs);
        }


        private void StartReceive(SocketAsyncEventArgs receiveSendEventArgs)
        {

            bool willRaiseEvent = receiveSendEventArgs.AcceptSocket.ReceiveAsync(receiveSendEventArgs);
            if (!willRaiseEvent)
            {
                ProcessReceive(receiveSendEventArgs);
            }
        }


        private void ProcessReceive(SocketAsyncEventArgs e)
        {
            // check if the remote host closed the connection
            AsyncUserToken token = (AsyncUserToken)e.UserToken;
            //token.StarTime = DateTime.Now;
            if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
            {
                // process the data here

                //reply to client
                byte[] AckData1 = BitConverter.GetBytes(1);
                SendData(AckData1, 0, AckData1.Length, e);


                StartReceive(e);
            }
            else
            {
                CloseClientSocket(e);
            }
        }


        private void IO_Completed(object sender, SocketAsyncEventArgs e)
        {
            // determine which type of operation just completed and call the associated handler 
            switch (e.LastOperation)
            {
                case SocketAsyncOperation.Receive:
                    ProcessReceive(e);
                    break;
                case SocketAsyncOperation.Send:
                    ProcessSend(e);
                    break;
                default:
                    throw new ArgumentException("The last operation completed on the socket was not a receive or send");
            }

        }





        private void CloseClientSocket(SocketAsyncEventArgs e)
        {

            AsyncUserToken token = e.UserToken as AsyncUserToken;

            // close the socket associated with the client 
            try
            {

                e.AcceptSocket.Shutdown(SocketShutdown.Send);
            }

            catch (Exception ex)
            {

            }

            e.AcceptSocket.Close();

            Interlocked.Decrement(ref SocketServer.m_numConnectedSockets);
            SocketServer.socket_monitor.DecSocketsConnected();

            SocketServer.m_bufferManager.FreeBuffer(e);

            e.Completed -= new EventHandler<SocketAsyncEventArgs>(IO_Completed);

            SocketServer.m_readWritePool.Push(e);

        }


        public void SendData(Byte[] data, Int32 offset, Int32 count, SocketAsyncEventArgs args)
        {

            try
            {

                Socket socket = args.AcceptSocket;
                if (socket.Connected)
                {
                    var i = socket.Send(data, offset, count, SocketFlags.None);
                }
            }
            catch (Exception Ex)
            {

            }
        }
    }

这里是在connectcallback方法中抛出错误的客户端代码

// State object for receiving data from remote device.
public class StateObject
{
    // Client socket.
    public Socket workSocket = null;
    // Size of receive buffer.
    public const int BufferSize = 256;
    // Receive buffer.
    public byte[] buffer = new byte[BufferSize];
    // Received data string.
    public StringBuilder sb = new StringBuilder();

    public int count = 0;
}

public class AsynchronousClient
{
    // The port number for the remote device.
    private const int port = 11000;

    private static int closecalled = 0;
    private static bool wait = true;
    // ManualResetEvent instances signal completion.
    private static ManualResetEvent connectDone =
        new ManualResetEvent(false);
    private static ManualResetEvent sendDone =
        new ManualResetEvent(false);
    private static ManualResetEvent receiveDone =
        new ManualResetEvent(false);

    // The response from the remote device.
    private static String response = String.Empty;

    private static void StartClient(Socket client, IPEndPoint remoteEP)
    {
        // Connect to a remote device.
        try
        {
            // Connect to the remote endpoint.
            client.BeginConnect(remoteEP,
                new AsyncCallback(ConnectCallback), new StateObject { workSocket = client });

        }
        catch (Exception e)
        {
            Console.WriteLine(e.ToString());
        }
    }

    private static void ConnectCallback(IAsyncResult ar)
    {
        try
        {
            // Retrieve the socket from the state object.
            StateObject state = (StateObject)ar.AsyncState;

            var client = state.workSocket;
            // Complete the connection.
            client.EndConnect(ar);
            var data = "AA5500C08308353816050322462F01020102191552E7D3FA52E7D3FB1FF85BF1FE9F201000004AB80000000500060800001EFFB72F0D00002973620000800000FFFFFFFF00009D6D00003278002EE16D0000018500000000000000000000003A0000000100000000828C80661FF8B436FE9EA9FC000000120000000700000000000000000000000400000000000000000000000000000000000000000000281E0000327800000000000000000000000000AF967D00000AEA000000000000000000000000";

                     Send(state, data);
        }
        catch (Exception e)
        {
            Console.WriteLine(e.ToString());
        }
    }

    private static void Receive(StateObject state)
    {
        try
        {
          Socket client = state.workSocket;
            // Begin receiving the data from the remote device.
            client.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0,
                new AsyncCallback(ReceiveCallback), state);
        }
        catch (Exception e)
        {
            Console.WriteLine(e.ToString());
        }
    }

    private static void ReceiveCallback(IAsyncResult ar)
    {
        try
        {
            StateObject state = (StateObject)ar.AsyncState;
            Socket client = state.workSocket;

            // Read data from the remote device.
            int bytesRead = client.EndReceive(ar);

            //if (wait)
            //{
            //    connectDone.WaitOne();
            //}

            if (bytesRead > 0)
            {
                state.count = state.count + 1;
                byte[] b = new byte[bytesRead];
                Array.Copy(state.buffer, b, 1);
                if (b[0] == 1)
                {

                    if (state.count < 10)
                    {
                        var data = "AA5500C08308353816050322462F01020102191552E7D3FA52E7D3FB1FF85BF1FE9F201000004AB80000000500060800001EFFB72F0D00002973620000800000FFFFFFFF00009D6D00003278002EE16D0000018500000000000000000000003A0000000100000000828C80661FF8B436FE9EA9FC000000120000000700000000000000000000000400000000000000000000000000000000000000000000281E0000327800000000000000000000000000AF967D00000AEA000000000000000000000000";
                        Send(state, data);
                    }
                    else
                    {
                        Interlocked.Increment(ref closecalled);
                        Console.WriteLine("closecalled:-" + closecalled + " at " + DateTime.Now);
                        client.Close();
                    }

                }
                else
                {
                    // Get the rest of the data.
                    client.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0,
                        new AsyncCallback(ReceiveCallback), state);
                }
            }
            else
            {
                client.Close();
            }
        }
        catch (Exception e)
        {
            Console.WriteLine(e.ToString());
        }
    }

    private static void Send(StateObject state, String data)
    {
        try
        {
            Socket client = state.workSocket;
            var hexlen = data.Length;
            byte[] byteData = new byte[hexlen / 2];
            int[] hexarray = new int[hexlen / 2];
            int i = 0;
            int k = 0;
            //create the byte array
            while (i < data.Length / 2)
            {
                string first = data[i].ToString();
                i++;
                string second = data[i].ToString();
                string x = first + second;
                byteData[k] = (byte)Convert.ToInt32(x, 16);
                i++;
                k++;
            }
            // Begin sending the data to the remote device.
            client.BeginSend(byteData, 0, byteData.Length, 0,
                new AsyncCallback(SendCallback), state);
        }
        catch (Exception e)
        {
            Console.WriteLine(e.ToString());
        }
    }

    private static void SendCallback(IAsyncResult ar)
    {
        try
        {
            // Retrieve the socket from the state object.
            StateObject state = (StateObject)ar.AsyncState;
            Socket client = state.workSocket;
            // Complete sending the data to the remote device.
            int bytesSent = client.EndSend(ar);
            Receive(state);
        }
        catch (Exception e)
        {
            Console.WriteLine(e.ToString());
        }
    }

    public static int Main(String[] args)
    {
        Start();
        Console.ReadLine();
        return 0;
    }

    private static void Start()
    {
        IPAddress ipaddress = IPAddress.Parse("10.20.2.152");
        IPEndPoint remoteEP = new IPEndPoint(ipaddress, port);

        for (int i = 0; i < 40000; i++)
        {
            Thread.Sleep(1);
            // Create a TCP/IP socket.
            try
            {
                Socket client = new Socket(AddressFamily.InterNetwork,
                    SocketType.Stream, ProtocolType.Tcp);
                StartClient(client, remoteEP);
            }
            catch (Exception e)
            {
                Console.WriteLine(e.ToString());
            }
            if (i == 39999)
            {
                Console.WriteLine("made all conns at " + DateTime.Now);
            }
        }
    }
}

最佳答案

我会使用线性队列来接受传入连接。像这样:

public async Task Accept40KClients()
{
    for (int i = 0; i < 40000; i++)
    {
        // Await this here   -------v
        bool willRaiseEvent = await listenSocket.AcceptAsync(acceptEventArg);
        if (!willRaiseEvent)
        {
            ProcessAccept(acceptEventArg);
        }
    }
}

如果这还不够快,也许您可​​以一次等待 10 次,但我认为这已经足够了……虽然我在这方面可能是错的。

关于c# SocketAsyncEventArgs 阻塞 ReceiveAsync 处理程序中的代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24809358/

相关文章:

c# - NHibernate 3.2 按代码 (Conformist) 字典属性的类映射

c# - 是否可以将 ServicePointManager 与 Webbrowser 控件一起使用?

python - python 中的多线程和异步套接字

linux - 在 centOS 上,大量 float 的字符串流序列化比序列化 block 的 4 个 pthread-ts 更快。 std::threads 在 Windows 上更快

c++ - C++多线程关闭TCP连接

python - 没有轮询的基于选择的套接字循环

python - 为什么当套接字变得可读时,10 个选择过程中只有 2 个被通知?

c# - C# 在同一个套接字上发送+接收数据

c# - 有没有办法从 vsix 扩展检测 VS 2012 中是否有打开的解决方案?

c# - 在 Windows 启动时启动 wpf 应用程序