c# - 如何编写可扩展的基于 TCP/IP 的服务器

标签 c# .net networking tcp scalability

我正处于编写新的 Windows 服务应用程序的设计阶段,该应用程序接受 TCP/IP 连接以进行长时间运行的连接(即,这不像 HTTP 那样有许多短连接,而是客户端连接并保持连接数小时或数天甚至数周)。
我正在寻找设计网络架构的最佳方式的想法。我将需要为该服务至少启动一个线程。我正在考虑使用异步 API(BeginRecieve 等),因为我不知道在任何给定时间(可能是数百个)我将连接多少个客户端。我绝对不想为每个连接启动一个线程。
数据将主要从我的服务器流出到客户端,但有时会从客户端发送一些命令。这主要是一个监控应用程序,我的服务器定期向客户端发送状态数据。
使其尽可能可扩展的最佳方法是什么?基本工作流程?
明确地说,我正在寻找基于 .NET 的解决方案(如果可能,C#,但任何 .NET 语言都可以使用)。
我需要一个解决方案的工作示例,作为指向我可以下载的内容的指针或内嵌的简短示例。并且它必须是基于 .NET 和 Windows 的(任何 .NET 语言都是可以接受的)。

最佳答案

我以前写过类似的东西。我多年前的研究表明,使用异步套接字编写自己的套接字实现是最好的选择。这意味着客户没有真正做任何事情实际上需要相对较少的资源。发生的任何事情都由 .NET 线程池处理。
我把它写成一个管理服务器所有连接的类。
我只是使用一个列表来保存所有客户端连接,但是如果您需要更快地查找更大的列表,您可以随心所欲地编写它。

private List<xConnection> _sockets;
此外,您还需要套接字实际监听传入连接。
private System.Net.Sockets.Socket _serverSocket;
start 方法实际上启动服务器套接字并开始监听任何传入连接。
public bool Start()
{
  System.Net.IPHostEntry localhost = System.Net.Dns.GetHostEntry(System.Net.Dns.GetHostName());
  System.Net.IPEndPoint serverEndPoint;
  try
  {
     serverEndPoint = new System.Net.IPEndPoint(localhost.AddressList[0], _port);
  }
  catch (System.ArgumentOutOfRangeException e)
  {
    throw new ArgumentOutOfRangeException("Port number entered would seem to be invalid, should be between 1024 and 65000", e);
  }
  try
  {
    _serverSocket = new System.Net.Sockets.Socket(serverEndPoint.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
   }
   catch (System.Net.Sockets.SocketException e)
   {
      throw new ApplicationException("Could not create socket, check to make sure not duplicating port", e);
    }
    try
    {
      _serverSocket.Bind(serverEndPoint);
      _serverSocket.Listen(_backlog);
    }
    catch (Exception e)
    {
       throw new ApplicationException("An error occurred while binding socket. Check inner exception", e);
    }
    try
    {
       //warning, only call this once, this is a bug in .net 2.0 that breaks if
       // you're running multiple asynch accepts, this bug may be fixed, but
       // it was a major pain in the rear previously, so make sure there is only one
       //BeginAccept running
       _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
    }
    catch (Exception e)
    {
       throw new ApplicationException("An error occurred starting listeners. Check inner exception", e);
    }
    return true;
 }
我只想指出异常处理代码看起来很糟糕,但原因是我在那里有异常抑制代码,因此任何异常都会被抑制并返回 false如果设置了配置选项,但为了简洁起见,我想删除它。
上面的 _serverSocket.BeginAccept(new AsyncCallback(acceptCallback)), _serverSocket) 实质上设置了我们的服务器套接字,以便在用户连接时调用 acceptCallback 方法。此方法从 .NET 线程池运行,如果您有许多阻塞操作,它会自动处理创建额外的工作线程。这应该以最佳方式处理服务器上的任何负载。
    private void acceptCallback(IAsyncResult result)
    {
       xConnection conn = new xConnection();
       try
       {
         //Finish accepting the connection
         System.Net.Sockets.Socket s = (System.Net.Sockets.Socket)result.AsyncState;
         conn = new xConnection();
         conn.socket = s.EndAccept(result);
         conn.buffer = new byte[_bufferSize];
         lock (_sockets)
         {
           _sockets.Add(conn);
         }
         //Queue receiving of data from the connection
         conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
         //Queue the accept of the next incoming connection
         _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
       }
       catch (SocketException e)
       {
         if (conn.socket != null)
         {
           conn.socket.Close();
           lock (_sockets)
           {
             _sockets.Remove(conn);
           }
         }
         //Queue the next accept, think this should be here, stop attacks based on killing the waiting listeners
         _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
       }
       catch (Exception e)
       {
         if (conn.socket != null)
         {
           conn.socket.Close();
           lock (_sockets)
           {
             _sockets.Remove(conn);
           }
         }
         //Queue the next accept, think this should be here, stop attacks based on killing the waiting listeners
         _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
       }
     }
上面的代码基本上刚刚接受了进来的连接,排队BeginReceive这是一个回调,当客户端发送数据时会运行,然后排队下一个 acceptCallback它将接受进入的下一个客户端连接。BeginReceive方法调用是告诉套接字从客户端接收数据时要做什么。对于 BeginReceive ,你需要给它一个字节数组,这是它在客户端发送数据时会复制数据的地方。 ReceiveCallback方法将被调用,这就是我们处理接收数据的方式。
private void ReceiveCallback(IAsyncResult result)
{
  //get our connection from the callback
  xConnection conn = (xConnection)result.AsyncState;
  //catch any errors, we'd better not have any
  try
  {
    //Grab our buffer and count the number of bytes receives
    int bytesRead = conn.socket.EndReceive(result);
    //make sure we've read something, if we haven't it supposadly means that the client disconnected
    if (bytesRead > 0)
    {
      //put whatever you want to do when you receive data here

      //Queue the next receive
      conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
     }
     else
     {
       //Callback run but no data, close the connection
       //supposadly means a disconnect
       //and we still have to close the socket, even though we throw the event later
       conn.socket.Close();
       lock (_sockets)
       {
         _sockets.Remove(conn);
       }
     }
   }
   catch (SocketException e)
   {
     //Something went terribly wrong
     //which shouldn't have happened
     if (conn.socket != null)
     {
       conn.socket.Close();
       lock (_sockets)
       {
         _sockets.Remove(conn);
       }
     }
   }
 }
编辑:在这种模式中,我忘了在这方面的代码中提到:
//put whatever you want to do when you receive data here

//Queue the next receive
conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
通常,在任何你想要的代码中,我都会将数据包重新组装成消息,然后将它们创建为线程池上的作业。这样,无论消息处理代码正在运行时,来自客户端的下一个块的 BeginReceive 都不会延迟。
接受回调通过调用结束接收完成读取数据套接字。这会填充开始接收函数中提供的缓冲区。一旦你在我留下评论的地方做任何你想做的事情,我们就会调用下一个 BeginReceive如果客户端发送更多数据,该方法将再次运行回调。
现在是真正棘手的部分:当客户端发送数据时,您的接收回调可能只用部分消息调用。重新组装会变得非常复杂。我使用我自己的方法并创建了一种专有协议(protocol)来做到这一点。我省略了它,但如果你要求,我可以添加它。这个处理程序实际上是我写过的最复杂的一段代码。
public bool Send(byte[] message, xConnection conn)
{
  if (conn != null && conn.socket.Connected)
  {
    lock (conn.socket)
    {
    //we use a blocking mode send, no async on the outgoing
    //since this is primarily a multithreaded application, shouldn't cause problems to send in blocking mode
       conn.socket.Send(bytes, bytes.Length, SocketFlags.None);
     }
   }
   else
     return false;
   return true;
 }
上面的send方法实际上使用了一个同步的Send打电话。由于我的应用程序的消息大小和多线程性质,这对我来说很好。如果你想发送到每个客户端,你只需要遍历 _sockets 列表。
您在上面看到的 xConnection 类基本上是一个简单的套接字包装器,用于包含字节缓冲区,在我的实现中还有一些额外的东西。
public class xConnection : xBase
{
  public byte[] buffer;
  public System.Net.Sockets.Socket socket;
}
此处还可以引用 using s 我包括在内,因为当他们不包括在内时我总是很生气。
using System.Net.Sockets;
我希望这会有所帮助。它可能不是最干净的代码,但它可以工作。代码也有一些细微差别,您应该对更改感到厌烦。对于一个,只有一个 BeginAccept随时调用。曾经有一个非常烦人的 .NET 错误,这是几年前的事了,所以我不记得细节了。
此外,在 ReceiveCallback代码,我们在排队下一次接收之前处理从套接字接收到的任何内容。这意味着对于单个套接字,我们实际上只是在 ReceiveCallback 中。在任何时间点一次,我们不需要使用线程同步。但是,如果您重新排序以在拉取数据后立即调用下一个接收,这可能会快一点,您需要确保正确同步线程。
此外,我砍掉了很多代码,但保留了正在发生的事情的本质。这应该是您设计的良好开端。如果您对此还有任何疑问,请发表评论。

关于c# - 如何编写可扩展的基于 TCP/IP 的服务器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/869744/

相关文章:

c# - 将所有警告视为错误

android - 使用 ConnectivityManager 和 NetworkInfo 避免 NullPointerException

sql - 随机遇到网络路径未找到异常

c# - ComboBox 和 KeyValuePair 列表无法正常工作

c# - 如何从 RichTextBox 中获取显示的文本?

c# - 使用资源管理器

c# - 如何在类库项目中使用 Server.MapPath

c# - 有没有办法可以延迟LINQ语句中的.Where子句?

asp.net - CSS 和缺少常量/变量问题?

java - 我无法从其他设备连接到 Flask 服务器(=不是来自本地主机)