c# - 在使用 TPL 的非常密集的应用程序中,将异步套接字并行化,而不仅仅是并发化

标签 c# sockets parallel-processing task-parallel-library

我正在编写一个使用 Socket 的应用程序,它会非常密集,然后我真的需要使用我们大型服务器中的每个内核。我在 stackoverflow 中看到问题 ( how to using ThreadPool to run socket thread parallel? ) 只有一个答案指向这个 MSDN Sample .

但我认为重点仅在于如何使其并发而不是并行,这里有人问How cpu intensive is opening a socket它看起来非常密集,这里有人告诉它没有帮助 TPL TaskFactory.FromAsync vs Tasks with blocking methods有人在这里教如何使用 TaskFactory.FromAsync ( Is there a pattern for wrapping existing BeginXXX/EndXXX async methods into async tasks? )。

如何保持套接字操作的并行性和高性能,以及如何处理套接字 断开连接、半连接套接字和消息边界等问题在正常的异步方式中是令人头疼的。把TPL和Task放在一起怎么处理。

最佳答案

看到:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace skttool
{
    public class StateObject
    {
        public Socket workSocket = null;
        public const int BufferSize = 1024;
        public byte[] buffer = new byte[BufferSize];
        public int bytesRead = 0;
        public StringBuilder sb = new StringBuilder();
    }

    public class tool
    {
        //-------------------------------------------------
        private ManualResetEvent evtConnectionDone = new ManualResetEvent(false);
        private Socket skttool = null;
        private bool running = false;
        private StateObject state = null;
        //-------------------------------------------------
        toolConfig _cfg;
        public tool(toolConfig cfg)
        {
            _cfg = cfg;
        }
        //-------------------------------------------------
        public void socketListeningSet()
        {
            IPEndPoint localEndPoint;
            Socket skttool;
            byte[] bytes = new Byte[1024];
            localEndPoint = new IPEndPoint(IPAddress.Any, _cfg.addressPort);
            skttool = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            skttool.Bind(localEndPoint);
            skttool.Listen(_cfg.maxQtdSockets);
        }
        //-------------------------------------------------
        public void start()
        {
            running = true;
            Task T1 = Task.Factory.StartNew(socketListeningSet);
            T1.ContinueWith(prev =>
            {
                while (running)
                {
                    evtConnectionDone.Reset();
                    Task<Socket> accepetChunk = Task<Socket>.Factory.FromAsync(
                                                                       skttool.BeginAccept,
                                                                       skttool.EndAccept,
                                                                       accept,
                                                                       skttool,
                                                                       TaskCreationOptions.AttachedToParent);
                    accepetChunk.ContinueWith(accept, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent);
                    evtConnectionDone.WaitOne();
                }
            });
        }
        //-------------------------------------------------
        void accept(Task<Socket> accepetChunk)
        {
            state = new StateObject();
            evtConnectionDone.Set();
            state.workSocket = accepetChunk.Result;
            Task<int> readChunk = Task<int>.Factory.FromAsync(
                                                       state.workSocket.BeginReceive,
                                                       state.workSocket.EndReceive,
                                                       state.buffer,
                                                       state.bytesRead,
                                                       state.buffer.Length - state.bytesRead,
                                                       null,
                                                       TaskCreationOptions.AttachedToParent);
            readChunk.ContinueWith(read, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent);
        }
        //-------------------------------------------------
        void read(Task<int> readChunk)
        {
            state.bytesRead += readChunk.Result;
            if (readChunk.Result > 0 && state.bytesRead < state.buffer.Length)
            {
                read();
                return;
            }
            _data = doTask(_data);
            Task<int> sendChunk = Task<int>.Factory.FromAsync(
                                                       state.workSocket.BeginSend,
                                                       state.workSocket.EndSend,
                                                       state.buffer,
                                                       state.bytesRead,
                                                       state.buffer.Length - state.bytesRead,
                                                       null,
                                                       TaskCreationOptions.AttachedToParent);
            sendChunk.ContinueWith(send, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent);
        }
        //-------------------------------------------------
        void send(Task<int> readChunk)
        {
            state.workSocket.Shutdown(SocketShutdown.Both);
            state.workSocket.Close();
        }
        //-------------------------------------------------
        byte[] doTask(byte[] data)
        {
            return Array.Reverse(data);
        }
        //-------------------------------------------------
    }
}

关于c# - 在使用 TPL 的非常密集的应用程序中,将异步套接字并行化,而不仅仅是并发化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5834755/

相关文章:

c# - C# 中的命名约定 - 下划线

c - 消息不会在服务器端打印 - 套接字

linux - 如何使用一个或多个特定内核执行应用程序?

python - 进程池执行器 : member variable lost in returned objects

c# - 在代码隐藏中从 Page_Load 调用 javascript 函数

c# - 如何在任意网格上找到顶点的邻居?

c# - 页面加载后运行 Javascript

sockets - 如果未连接,如何退出服务器连接

java套接字问题连接被对等方重置

c++ - OpenMP 和内核/线程