c# - 确保调用 Socket.XXXAsync 的线程保持事件状态以完成 IO 请求(IO 完成端口,C#)

标签 c# .net multithreading sockets asynchronous

我正在实现一个小型库来使用 System.Net.Sockets.Socket更轻松。它应该处理任意数量的监听和接收 TCP 套接字,并且实现速度快很重要。

我正在使用 XXXAsync方法和 CLR ThreadPool回调库用户的委托(delegate)(例如,每当成功发送消息或接收到某些数据时)。最好该库不会自行启动任何线程。

图书馆的用户可以访问我的Sockets的界面。用于发送消息或开始接收消息的包装器(在许多其他方法和重载中):

public interface IClientSocket {
    // will eventually call Socket.SendAsync
    IMessageHandle SendAsync(byte[] buffer, int offset, int length);

    // will eventually call Socket.RecieveAsync
    void StartReceiveAsync();
}
XXXAsync方法使用 IO 完成端口。因此,调用这些方法的线程必须保持事件状态,直到操作完成,否则操作将失败并返回 SocketError.OperationAborted。 (我认为是这样,还是不是?)。
对图书馆的用户施加这样的限制是丑陋的并且容易出错。

这里最好的选择是什么?
  • 调用ThreadPool.QueueUserWorkItem使用委托(delegate)调用 XXXAsync方法?那安全吗?我在某处读到,ThreadPool 不会停止具有任何 IOCP 的空闲线程。那会很好,因为它解决了上面的问题。
    但是对于许多 TCP 连接,它也可能很糟糕。
    在这种情况下,很可能每个 ThreadPool 线程都调用了未决的 ReceiveAsync 之一。来电。因此,即使当前工作负载很低并且许多线程处于空闲状态(并且浪费内存),线程池也永远不会缩小。
  • 启动一个始终处于事件状态并调用 XXXAsync 的专用线程方法。例如,当一个库用户想要发送数据时,它将一个委托(delegate)放入同步队列,线程弹出它并调用 SendAsync方法。
    我不太喜欢这个解决方案,因为它浪费了一个线程,并且在多核机器上,发送只能由一个线程执行。

  • 此外,这两种解决方案都不是最好的,因为它们将调用异步方法的工作传递给另一个线程。可以避免吗?

    你怎么看? (谢谢!!)

    托马斯

    编辑1:

    我可以重现 SocketError.OperationAborted以下测试程序有问题(我认为它是正确的)。
    编译、启动并 telnet 到端口 127.0.0.1:10000。发送“t”或“T”并等待 > 3 秒。发送“T”时,ReceiveAsync调用在 ThreadPool 中完成(工作),使用“t”启动一个新线程,该线程在 3 秒后终止(失败)。
    using System;
    using System.Net;
    using System.Net.Sockets;
    using System.Text;
    using System.Threading;
    using System.Collections.Generic;
    
    namespace Test {
        class Program {
            static List<Socket> _referenceToSockets = new List<Socket>();
            static void Main(string[] args) {
                Thread.CurrentThread.Name = "Main Thread";
    
                // Create a listening socket on Port 10000.
                Socket ServerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                _referenceToSockets.Add(ServerSocket);
                var endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 10000);
                ServerSocket.Bind(endPoint);
                ServerSocket.Listen(50);
    
                // Start listening.
                var saeaAccept = new SocketAsyncEventArgs();
                saeaAccept.Completed += OnCompleted;
                ServerSocket.AcceptAsync(saeaAccept);
    
                Console.WriteLine(String.Format("Listening on {0}.", endPoint));
                Console.ReadLine();
            }
    
            private static void OnCompleted(object obj, SocketAsyncEventArgs evt) {
                var socket = (Socket)obj;
                Console.WriteLine(String.Format("Async operation completed: {0}; Error: {1}; Callback-Thread: \"{2}\" ({3} threadpool)", evt.LastOperation, evt.SocketError, Thread.CurrentThread.Name, Thread.CurrentThread.IsThreadPoolThread?"is":"no"));
                switch (evt.LastOperation) {
                    case SocketAsyncOperation.Accept:
                        // Client connected. Listen for more.
                        Socket clientSocket = evt.AcceptSocket;
                        _referenceToSockets.Add(clientSocket);
                        evt.AcceptSocket = null;
                        socket.AcceptAsync(evt);
    
                        // Start receiving data.
                        var saeaReceive = new SocketAsyncEventArgs();
                        saeaReceive.Completed += OnCompleted;
                        saeaReceive.SetBuffer(new byte[1024], 0, 1024);
                        clientSocket.ReceiveAsync(saeaReceive);
                        break;
                    case SocketAsyncOperation.Disconnect:
                        socket.Close();
                        evt.Dispose();
                        break;
                    case SocketAsyncOperation.Receive:
                        if (evt.SocketError != SocketError.Success) {
                            socket.DisconnectAsync(evt);
                            return;
                        }
                        var asText = Encoding.ASCII.GetString(evt.Buffer, evt.Offset, evt.BytesTransferred);
                        Console.WriteLine(String.Format("Received: {0} bytes: \"{1}\"", evt.BytesTransferred, asText));
                        if (evt.BytesTransferred == 0) {
                            socket.Close();
                            evt.Dispose();
                        }
                        if (asText.ToUpper().StartsWith("T")) {
                            Action<object> action = (object o) => {
                                socket.ReceiveAsync(evt);
                                Console.WriteLine(String.Format("Called ReceiveAsync {0}...", o));
                                Thread.Sleep(3000);
                                Console.WriteLine("End of Action...");
                            };
                            if (asText.StartsWith("T")) {
                                ThreadPool.QueueUserWorkItem(o=>action(o), "in ThreadPool");
                            } else {
                                new Thread(o=>action(o)).Start("in a new Thread");
                            }
                        } else {
                            socket.ReceiveAsync(evt);
                        }
                        break;
                }
            }
        }
    }
    

    编辑#3

    以下是我打算使用的解决方案:

    我让库用户的线程调用 XXXAsync (SendAsync 除外)直接操作。在大多数情况下,调用会成功(因为调用线程很少会终止)。
    如果操作失败并返回 SocketError.OperationAborted ,库只是使用来自异步回调的当前线程再次调用操作。这个是ThreadPool线程并且它很有可能成功(将设置一个附加标志,如果 SocketError.OperationAborted 的原因是由于某些其他错误引起的,则最多使用此解决方法一次)。
    这应该可以工作,因为套接字本身仍然可以,只是之前的操作失败了。

    对于SendAsync ,此解决方法不起作用,因为它可能会弄乱消息的顺序。在这种情况下,我会将消息排队到 FIFO 列表中。我将使用 ThreadPool将它们出列并通过 SendAsync 发送它们.

    最佳答案

    这里有几个危险信号。如果您在保持线程活跃时遇到问题,那么您的线程没有做任何有用的事情。在这种情况下,使用 Async 方法没有意义。如果速度是您唯一关心的问题,那么您不应该使用 Async 方法。他们要求 Socket 抓取一个 tp 线程来进行回调。这是很小的开销,但如果常规线程进行阻塞调用,则没有。

    当您需要在处理许多同时连接时使您的应用程序很好地扩展时,您应该只考虑 Async 方法。这会产生非常突发的 cpu 负载,非常适合 tp 线程。

    关于c# - 确保调用 Socket.XXXAsync 的线程保持事件状态以完成 IO 请求(IO 完成端口,C#),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5465807/

    相关文章:

    c# - 通过引用传递的多个线程访问的静态方法

    c# - SQL 程序集和 SQL 注入(inject)

    java - 如何为多线程程序设置环境变量?

    multithreading - Vala:当加入的线程未被引用时读取无效

    c# - XNA 倍数立方体

    c# - 相当于Lambda的C#

    c# - 如何在库异常中包含附加信息

    c# - 服务中托管的 WCF 的交互式 "screen"

    python - 多次调用时如何停止长时间运行的函数?

    c# - dailymotion 和 blip.tv 的视频 Url 解析器