我正在实现一个小型库来使用 System.Net.Sockets.Socket
更轻松。它应该处理任意数量的监听和接收 TCP 套接字,并且实现速度快很重要。
我正在使用 XXXAsync
方法和 CLR ThreadPool
回调库用户的委托(delegate)(例如,每当成功发送消息或接收到某些数据时)。最好该库不会自行启动任何线程。
图书馆的用户可以访问我的Sockets
的界面。用于发送消息或开始接收消息的包装器(在许多其他方法和重载中):
public interface IClientSocket {
// will eventually call Socket.SendAsync
IMessageHandle SendAsync(byte[] buffer, int offset, int length);
// will eventually call Socket.RecieveAsync
void StartReceiveAsync();
}
XXXAsync
方法使用 IO 完成端口。因此,调用这些方法的线程必须保持事件状态,直到操作完成,否则操作将失败并返回 SocketError.OperationAborted
。 (我认为是这样,还是不是?)。对图书馆的用户施加这样的限制是丑陋的并且容易出错。
这里最好的选择是什么?
ThreadPool.QueueUserWorkItem
使用委托(delegate)调用 XXXAsync
方法?那安全吗?我在某处读到,ThreadPool 不会停止具有任何 IOCP 的空闲线程。那会很好,因为它解决了上面的问题。但是对于许多 TCP 连接,它也可能很糟糕。
在这种情况下,很可能每个 ThreadPool 线程都调用了未决的
ReceiveAsync
之一。来电。因此,即使当前工作负载很低并且许多线程处于空闲状态(并且浪费内存),线程池也永远不会缩小。 XXXAsync
的专用线程方法。例如,当一个库用户想要发送数据时,它将一个委托(delegate)放入同步队列,线程弹出它并调用 SendAsync
方法。我不太喜欢这个解决方案,因为它浪费了一个线程,并且在多核机器上,发送只能由一个线程执行。
此外,这两种解决方案都不是最好的,因为它们将调用异步方法的工作传递给另一个线程。可以避免吗?
你怎么看? (谢谢!!)
托马斯
编辑1:
我可以重现
SocketError.OperationAborted
以下测试程序有问题(我认为它是正确的)。编译、启动并 telnet 到端口 127.0.0.1:10000。发送“t”或“T”并等待 > 3 秒。发送“T”时,
ReceiveAsync
调用在 ThreadPool 中完成(工作),使用“t”启动一个新线程,该线程在 3 秒后终止(失败)。using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Collections.Generic;
namespace Test {
class Program {
static List<Socket> _referenceToSockets = new List<Socket>();
static void Main(string[] args) {
Thread.CurrentThread.Name = "Main Thread";
// Create a listening socket on Port 10000.
Socket ServerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_referenceToSockets.Add(ServerSocket);
var endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 10000);
ServerSocket.Bind(endPoint);
ServerSocket.Listen(50);
// Start listening.
var saeaAccept = new SocketAsyncEventArgs();
saeaAccept.Completed += OnCompleted;
ServerSocket.AcceptAsync(saeaAccept);
Console.WriteLine(String.Format("Listening on {0}.", endPoint));
Console.ReadLine();
}
private static void OnCompleted(object obj, SocketAsyncEventArgs evt) {
var socket = (Socket)obj;
Console.WriteLine(String.Format("Async operation completed: {0}; Error: {1}; Callback-Thread: \"{2}\" ({3} threadpool)", evt.LastOperation, evt.SocketError, Thread.CurrentThread.Name, Thread.CurrentThread.IsThreadPoolThread?"is":"no"));
switch (evt.LastOperation) {
case SocketAsyncOperation.Accept:
// Client connected. Listen for more.
Socket clientSocket = evt.AcceptSocket;
_referenceToSockets.Add(clientSocket);
evt.AcceptSocket = null;
socket.AcceptAsync(evt);
// Start receiving data.
var saeaReceive = new SocketAsyncEventArgs();
saeaReceive.Completed += OnCompleted;
saeaReceive.SetBuffer(new byte[1024], 0, 1024);
clientSocket.ReceiveAsync(saeaReceive);
break;
case SocketAsyncOperation.Disconnect:
socket.Close();
evt.Dispose();
break;
case SocketAsyncOperation.Receive:
if (evt.SocketError != SocketError.Success) {
socket.DisconnectAsync(evt);
return;
}
var asText = Encoding.ASCII.GetString(evt.Buffer, evt.Offset, evt.BytesTransferred);
Console.WriteLine(String.Format("Received: {0} bytes: \"{1}\"", evt.BytesTransferred, asText));
if (evt.BytesTransferred == 0) {
socket.Close();
evt.Dispose();
}
if (asText.ToUpper().StartsWith("T")) {
Action<object> action = (object o) => {
socket.ReceiveAsync(evt);
Console.WriteLine(String.Format("Called ReceiveAsync {0}...", o));
Thread.Sleep(3000);
Console.WriteLine("End of Action...");
};
if (asText.StartsWith("T")) {
ThreadPool.QueueUserWorkItem(o=>action(o), "in ThreadPool");
} else {
new Thread(o=>action(o)).Start("in a new Thread");
}
} else {
socket.ReceiveAsync(evt);
}
break;
}
}
}
}
编辑#3
以下是我打算使用的解决方案:
我让库用户的线程调用
XXXAsync
(SendAsync
除外)直接操作。在大多数情况下,调用会成功(因为调用线程很少会终止)。如果操作失败并返回
SocketError.OperationAborted
,库只是使用来自异步回调的当前线程再次调用操作。这个是ThreadPool
线程并且它很有可能成功(将设置一个附加标志,如果 SocketError.OperationAborted
的原因是由于某些其他错误引起的,则最多使用此解决方法一次)。这应该可以工作,因为套接字本身仍然可以,只是之前的操作失败了。
对于
SendAsync
,此解决方法不起作用,因为它可能会弄乱消息的顺序。在这种情况下,我会将消息排队到 FIFO 列表中。我将使用 ThreadPool
将它们出列并通过 SendAsync
发送它们.
最佳答案
这里有几个危险信号。如果您在保持线程活跃时遇到问题,那么您的线程没有做任何有用的事情。在这种情况下,使用 Async 方法没有意义。如果速度是您唯一关心的问题,那么您不应该使用 Async 方法。他们要求 Socket 抓取一个 tp 线程来进行回调。这是很小的开销,但如果常规线程进行阻塞调用,则没有。
当您需要在处理许多同时连接时使您的应用程序很好地扩展时,您应该只考虑 Async 方法。这会产生非常突发的 cpu 负载,非常适合 tp 线程。
关于c# - 确保调用 Socket.XXXAsync 的线程保持事件状态以完成 IO 请求(IO 完成端口,C#),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5465807/