我正在编写一个使用 Socket 的应用程序,它会非常密集,然后我真的需要使用我们大型服务器中的每个内核。我在 stackoverflow 中看到问题 ( how to using ThreadPool to run socket thread parallel? ) 只有一个答案指向这个 MSDN Sample .
但我认为重点仅在于如何使其并发而不是并行,这里有人问How cpu intensive is opening a socket它看起来非常密集,这里有人告诉它没有帮助 TPL TaskFactory.FromAsync vs Tasks with blocking methods有人在这里教如何使用 TaskFactory.FromAsync ( Is there a pattern for wrapping existing BeginXXX/EndXXX async methods into async tasks? )。
如何保持套接字操作的并行性和高性能,以及如何处理套接字 断开连接、半连接套接字和消息边界等问题在正常的异步方式中是令人头疼的。把TPL和Task放在一起怎么处理。
最佳答案
看到:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace skttool
{
public class StateObject
{
public Socket workSocket = null;
public const int BufferSize = 1024;
public byte[] buffer = new byte[BufferSize];
public int bytesRead = 0;
public StringBuilder sb = new StringBuilder();
}
public class tool
{
//-------------------------------------------------
private ManualResetEvent evtConnectionDone = new ManualResetEvent(false);
private Socket skttool = null;
private bool running = false;
private StateObject state = null;
//-------------------------------------------------
toolConfig _cfg;
public tool(toolConfig cfg)
{
_cfg = cfg;
}
//-------------------------------------------------
public void socketListeningSet()
{
IPEndPoint localEndPoint;
Socket skttool;
byte[] bytes = new Byte[1024];
localEndPoint = new IPEndPoint(IPAddress.Any, _cfg.addressPort);
skttool = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
skttool.Bind(localEndPoint);
skttool.Listen(_cfg.maxQtdSockets);
}
//-------------------------------------------------
public void start()
{
running = true;
Task T1 = Task.Factory.StartNew(socketListeningSet);
T1.ContinueWith(prev =>
{
while (running)
{
evtConnectionDone.Reset();
Task<Socket> accepetChunk = Task<Socket>.Factory.FromAsync(
skttool.BeginAccept,
skttool.EndAccept,
accept,
skttool,
TaskCreationOptions.AttachedToParent);
accepetChunk.ContinueWith(accept, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent);
evtConnectionDone.WaitOne();
}
});
}
//-------------------------------------------------
void accept(Task<Socket> accepetChunk)
{
state = new StateObject();
evtConnectionDone.Set();
state.workSocket = accepetChunk.Result;
Task<int> readChunk = Task<int>.Factory.FromAsync(
state.workSocket.BeginReceive,
state.workSocket.EndReceive,
state.buffer,
state.bytesRead,
state.buffer.Length - state.bytesRead,
null,
TaskCreationOptions.AttachedToParent);
readChunk.ContinueWith(read, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent);
}
//-------------------------------------------------
void read(Task<int> readChunk)
{
state.bytesRead += readChunk.Result;
if (readChunk.Result > 0 && state.bytesRead < state.buffer.Length)
{
read();
return;
}
_data = doTask(_data);
Task<int> sendChunk = Task<int>.Factory.FromAsync(
state.workSocket.BeginSend,
state.workSocket.EndSend,
state.buffer,
state.bytesRead,
state.buffer.Length - state.bytesRead,
null,
TaskCreationOptions.AttachedToParent);
sendChunk.ContinueWith(send, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent);
}
//-------------------------------------------------
void send(Task<int> readChunk)
{
state.workSocket.Shutdown(SocketShutdown.Both);
state.workSocket.Close();
}
//-------------------------------------------------
byte[] doTask(byte[] data)
{
return Array.Reverse(data);
}
//-------------------------------------------------
}
}
关于c# - 在使用 TPL 的非常密集的应用程序中,将异步套接字并行化,而不仅仅是并发化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5834755/