c# - 是否可以将异步接收器用于未知数量的接收字节?

标签 c# sockets asynchronous

我的应用程序基于C#同步套接字构建,为了提高效率(如果可能),我试图用异步套接字替换它们。到目前为止,我已经在服务器上用异步的(基于MSDN文档)替换了Send()和Accept(),但是在实现Receive()时遇到了一些问题。

原始(同步):

Socket _socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
bool _flag = true;
CheckFun()
{
   while (_flag)
   {
      Thread.Sleep(10); // every ~10ms check
      byte[] _buffer;
      if (_socket.Available != 0)
      {
          _buffer = new byte[1]; // just check 1st byte
          _socket.Receive(_buffer, 1, SocketFlags.None);
          if (_buffer[0] == 1)
          {
              ReceiveFun();
          }
      }
   }
}

ReceiveFun()
{    
   int hdrSize = 12;
   byte[] buffer = new byte[hdrSize];   
   _socket.Receive(buffer, hdrSize, SocketFlags.None);
   int _dataLength = buffer[0]; // the 1st byte has already been removed before calling the ReceiveFun()
   buffer = new byte[_dataLength]; 
   int _bytesRead = 0;
   while (_bytesRead != _dataLength)
   {
        while (_socket.Available == 0)
        {
             if (!_socket.Connected)
                 return 0;
        }
        _bytesRead += _socket.Receive(buffer, _bytesRead, _dataLength - _bytesRead, SocketFlags.None);
   }
   //read bytes blah blah...
}

我的问题是如何将这一操作转换为异步操作,并将接收到的字节连接起来,直到我收到所有信息?然后再次等待下一个?

编辑

异​​步

public class State
{
    public int nToReadBytes = 0;
    public int nBytesRead = 0;
    public const int BufferSize = 1024;         
    public byte[] buffer = new byte[BufferSize];     // Receive buffer.     
}

List<byte> lReceivedBytes = new List<byte>();
int hdrSize = 12;
public void ReadCallback(IAsyncResult ar)
{
     // Retrieve the state object and the handler socket  
    // from the asynchronous state object.  
    var state = ar.AsyncState as State;  

   // Read data from the client socket.   
   int availableBytes = oSocket.EndReceive(ar);

   if (availableBytes > 0)
   {
       if (lReceivedBytes.Count == 0)
       {
            if (state.buffer[0] == 1)
            { 
                 // the first field of the header has been successfully decoded
                 if (availableBytes > 1)
                 {
                     state.nToReadBytes = BitConverter.ToInt32(state.buffer, 1) + hdrSize;
                     int _bytesCopy = Math.Min(state.nToReadBytes, state.buffer.Length); //in case that the data is less than the State.BufferSize (unlikely)
                     state.nBytesRead += _bytesCopy;
                     lReceivedBytes.AddRange(state.buffer);
                 }
            }  
            else if (state.buffer[0] == 2)
            { 
                 // the first field of the header has been successfully decoded but do nothing!
                 _socket.BeginReceive(state.buffer, 0, State.BufferSize, 0, new AsyncCallback(ReadCallback), state);
                 return;
            }  
            else
                throw new InvalidDataException("Invalid hdr field [1-2]: " + state.buffer[0]);  
       }
       else
       {
            state.nBytesRead += state.buffer.Length;
            lReceivedBytes.AddRange(state.buffer);         
       }

       if (lReceivedBytes.Count == state.nToReadBytes)
       { 
           //read all information and clear list and States in the end (?)
           //   ...
           lReceivedBytes.Clear();
           state.nToReadBytes = 0;
           state.nBytesRead = 0;
           _socket.BeginReceive(state.buffer, 0, State.BufferSize, 0, new AsyncCallback(ReadCallback), state);
       }
       else
       {
            //int _newSize = Math.Min(state.nToReadBytes - state.nBytesRead, State.BufferSize); // for restriction (?)      
            int _newSize = state.nToReadBytes - state.nBytesRead; // for now don't check 
            _socket.BeginReceive(state.buffer, 0, _newSize, 0, new AsyncCallback(ReadCallback), state); //shall I increase the size (it could be between 90 kB - 170kB, until all info is received)
       }
    }
    else
       _socket.BeginReceive(state.buffer, 0, State.BufferSize, 0, new AsyncCallback(ReadCallback), state);
 }

最佳答案

Socket类具有两个主要的异步范例:原始的基于回调的异步编程模型(APM)和仅稍作更新的基于事件的异步模式(EAP)。与同步方法相比,这两种方法都难以实现,因为它们要求您将思想调整为连接状态,而不仅仅是本地方法逻辑,将单个方法分解为处理初始化和完成的部分的操作。

幸运的是,更新的任务并行库模型利用asyncawait允许异步代码以与等效同步版本几乎相同的方式编写。对于Socket类,要利用此优势,您需要以与TPL兼容的方式包装其异步API。 .NET确实提供了一种通用方法,可以采用现有的APM API并将它们包装在任务中(请参阅https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/tpl-and-traditional-async-programming),但是恕我直言,利用NetworkStream类更为简单,该类将套接字包装在Stream对象中。

由于Stream已经收到了“TPL之爱”,即具有ReceiveAsync()和类似的方法来允许基于TPL的操作,因此,我发现这比处理旨在将APM映射到TPL的包装方法要容易一些。

在您的代码中,最终看起来像这样:

// Somewhere appropriate -- your code example isn't specific or complete enough
// to offer anything more detailed than this
NetworkStream _stream = new NetworkStream(_socket);

async Task ReceiveFun()
{    
    int _bytesRead = 0, hdrSize = 12;
    byte[] buffer = new byte[hdrSize];

    while (_bytesRead < hdrSize)
    {
        int bytesRead = await _stream.ReadAsync(buffer, _bytesRead, hdrSize - _bytesRead);

        if (bytesRead == 0) throw new InvalidDataException("unexpected end-of-stream");
        _bytesRead += bytesRead;
    }

    int _dataLength = buffer[0]; // the 1st byte has already been removed before calling the ReceiveFun()
    buffer = new byte[_dataLength]; 

    _bytesRead = 0;
    while (_bytesRead < _dataLength)
    {
        int bytesRead = await _stream.ReadAsync(buffer, _bytesRead, _dataLength - _bytesRead);

        if (bytesRead == 0) throw new InvalidDataException("unexpected end-of-stream");
        _bytesRead += bytesRead;
   }
   //read bytes blah blah...
}

请注意,您的原始同步代码有一个错误:读取 header 时未使用返回的字节数,因此无法知道完整的 header 实际上是在单个调用中返回的。您必须始终查看读取的字节数。当有数据可用时,它总是可以少到1;而当到达流的末尾时(即,远程端点使用关闭操作),当然它将为0。

您的代码也没有正确检查流的末尾。当您尝试从套接字读取时,只需查看返回的字节。我也已在上面修复了该问题。

关于c# - 是否可以将异步接收器用于未知数量的接收字节?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58998398/

相关文章:

android - 使用 AsyncTask 发送 Android 邮件

c++ - MySQL异步?

.net - 是否有 Control.BeginInvoke 的变体可以在句柄销毁之前/之后工作?

c# - Linq 从另一个区间列表中选择任何区间内的所有数字

python - 使用 Python TAP 设备嗅探 ICMP 数据包(例如 ping echo 请求)

sockets - Rabbitmq 监听器未连接到另一台机器

c - 将套接字绑定(bind)到 ansi c 中的端口 80

c# - 为什么相互比较 2 个 .NET 框架类会导致 stackoverflow 异常?

c# - 将 ICollectionView 转换为 List<T>

c# - URL 中存在特殊字符的 Web Api 2 路由问题