c# - 网络流写入被阻止

标签 c# windows tcp networkstream

我正在开发一个接受来自不同客户端的多个 tcp 连接的 c# 应用程序 (.net 4)。有一个接受套接字的 tcp 监听器。双工通信黑白节点。数据使用 Networkstream.Write 方法发送,使用 Networkstream.read 方法读取。为每个 tcp 连接创建一个单独的线程。

问题是,几天前我们注意到其中一个客户端停止读取数据(由于错误)达 20 分钟。由于连接没有中断,服务器上没有 (IO) 异常。但是,我们注意到其他客户端的数据也没有运行。 20 分钟后,该客户端再次开始接收数据,很快其他客户端也开始接收数据。

我知道网络流的写入方法是一种阻塞方法,我们没有使用任何超时。因此,有可能写入已被阻止(描述为 here )。但据我了解,每个 tcp 连接都必须有一个单独的写缓冲区,或者还有更多的东西在起作用。 TCP 连接上的发送阻塞是否会影响同一应用程序中的其他 TCP 连接?

这里是写操作的伪代码。对于每个连接,一个单独的线程都有一个单独的传出队列进程。

public class TCPServerListener : baseConnection
{

    private readonly int _Port;
    private TcpListener _tcpListener;
    private Thread _thread;
    private List<TcpClientData> _tcpClientDataList = new List<TcpClientData>();
    private long _messageDiscardTimeout;
    private bool LoopForClientConnection = true;

    public TCPServerListener(int port, ThreadPriority threadPriority)
    {
        try
        {
            // init property
        }
        catch (Exception ex)
        {
            // log
        }
    }

    public void SendMessageToAll(int type)
    {
        base.EnqueueMessageToSend(type, _tcpClientDataList);
    }
    public void SendMessageToList(int type, IList<TcpClient> tcpClientList)
    {
        base.EnqueueMessageToSend(type, tcpClientList);
    }
    public void SendMessage(int type, TcpClient tcpClient)
    {
        base.EnqueueMessageToSend(type, tcpClient);
    }



    private void AcceptClientConnections()
    {
        while (LoopForClientConnection)
        {
            try
            {
                Socket socket = _tcpListener.AcceptSocket();
                TcpClientData tcpClientData = new TcpClientData();
                tcpClientData.tcpClientThread = new Thread(new ParameterizedThreadStart(StartAsync));
                tcpClientData.tcpClientThread.Priority = _threadPriority;
                tcpClientData.tcpClientThread.IsBackground = true;
                tcpClientData.tcpClientThread.Name = "CD" + tcpClientData.tcpClientThread.ManagedThreadId;
                tcpClientData.tcpClient = new TcpClient();
                tcpClientData.tcpClient.Client = socket;
                _tcpClientDataList.Add(tcpClientData);
                tcpClientData.tcpClientThread.Start(tcpClientData.tcpClient);
            }
            catch (ThreadAbortException ex)
            {
                //log

            }
            catch (Exception ex)
            {
                //log
            }
        }
    }




    public override void Start()
    {
        base.Start();
        _tcpListener = new TcpListener(System.Net.IPAddress.Any, _Port);

        _thread = new Thread(AcceptClientConnections);
        _thread.Priority = _threadPriority;
        _thread.IsBackground = true;

        _tcpListener.Start();
        _thread.Start();
    }

    public override void Stop()
    {
       // stop listener and terminate threads
    }
}


public class baseConnection
{
    private Thread _InCommingThread;
    private Thread _OutGoingThread;
    protected ThreadPriority _threadPriority;
    protected BlockingCollection<MessageReceived> _InComingMessageQueue = new BlockingCollection<MessageReceived>();
    protected BlockingCollection<MessageToSend> _OutgoingMessageQueue = new BlockingCollection<MessageToSend>();

    public void StartAsync(Object oTcpClient)
    {
        TcpClient tcpClient = oTcpClient as TcpClient;
        if (tcpClient == null)
            return;

        using (tcpClient)
        {
            using (NetworkStream stream = tcpClient.GetStream())
            {
                stream.ReadTimeout = Timeout.Infinite;
                stream.WriteTimeout = Timeout.Infinite;

                BinaryReader bodyReader = new BinaryReader(stream);

                while (tcpClient.Connected)
                {
                    try
                    {
                        int messageType = bodyReader.ReadInt32();

                        // checks to verify messages 

                        // enqueue message in incoming queue
                        _InComingMessageQueue.Add(new MessageReceived(messageType, tcpClient));
                    }
                    catch (EndOfStreamException ex)
                    {
                        // log
                        break;
                    }
                    catch (Exception ex)
                    {
                        // log
                        Thread.Sleep(100);
                    }
                }
                //RaiseDisconnected(tcpClient);
            }
        }
    }


    public virtual void Start()
    {
        _InCommingThread = new Thread(HandleInCommingMessnge);
        _InCommingThread.Priority = _threadPriority;
        _InCommingThread.IsBackground = true;
        _InCommingThread.Start();

        _OutGoingThread = new Thread(HandleOutgoingQueue);
        _OutGoingThread.Priority = _threadPriority;
        _OutGoingThread.IsBackground = true;
        _OutGoingThread.Start();
    }


    public virtual void Stop()
    {
       // stop the threads and free up resources
    }

    protected void EnqueueMessageToSend(int type, List<TcpClientData> tcpClientDataList)
    {
        tcpClientDataList.ForEach(x => _OutgoingMessageQueue.Add(new MessageToSend(type, x.tcpClient)));
    }
    protected void EnqueueMessageToSend(int type, IList<TcpClient> tcpClientList)
    {
        foreach (TcpClient tcpClient in tcpClientList)
        {
            _OutgoingMessageQueue.Add(new MessageToSend(type, tcpClient));
        }
    }
    protected void EnqueueMessageToSend(int type, TcpClient tcpClient)
    {
        _OutgoingMessageQueue.Add(new MessageToSend(type, tcpClient));
    }


    private void HandleOutgoingQueue()
    {
        while (true)
        {
            try
            {

                MessageToSend message = _OutgoingMessageQueue.Take();

                if (message.tcpClient.Connected)
                {
                    BinaryWriter writer = new BinaryWriter(message.tcpClient.GetStream());
                    writer.Write(message.type);
                }
            }
            catch (ThreadAbortException ex)
            {
                // log
                return;
            }
            catch (Exception ex)
            {
                //_logger.Error(ex.Message, ex);
            }
        }
    }

    private void HandleInCommingMessnge()
    {
        while (true)
        {
            try
            {
                MessageReceived messageReceived = _InComingMessageQueue.Take();

                // handle message
            }
            catch (ThreadAbortException ex)
            {
                // log
                return;
            }
            catch (Exception ex)
            {
                // log
                //_logger.Error(ex.Message, ex);
            }
        }
    }

    public class MessageReceived
    {
        public MessageReceived(int type, TcpClient tcpClient)
        {
            this.tcpClient = tcpClient;
            this.type = type;
        }

        public int type;
        public TcpClient tcpClient;
    }

    public class MessageToSend
    {
        public MessageToSend(int type, TcpClient tcpClient)
        {
            this.tcpClient = tcpClient;
            this.type = type;
        }

        public int type;
        public TcpClient tcpClient;
    }

    public class TcpClientData
    {
        public Thread tcpClientThread;
        public TcpClient tcpClient;
    }
}

最佳答案

您提到为每个连接创建一个单独的线程,但您显示的代码似乎能够为任何 连接使消息出队。

如果此代码在多个线程上运行,程序将在每个线程当前都尝试向阻塞连接发送消息时立即阻塞。如果此循环在多个线程上运行,您可能会遇到的另一个问题是消息可能不会以正确的顺序到达同一连接。

关于c# - 网络流写入被阻止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14251948/

相关文章:

c - C 上 float 数量的错误

c - 程序运行不符合我的预期

c - TCP/UDP 和以太网 MTU 分段

linux - TCP RST 段发送到 peer over socket 的情况有多少?

c# - 如何在 Visual Studio 中逐步执行引用的程序集?

c# - 在 MVC 5 应用程序中记录异常的位置或时间

c# - 接受 EventHandler<T> 和 EventHandler 的方法

c# - 在 C# 中将 decimal[][] 转换为 double[][]

c - Linux 中是否有 API 可以从语言环境中获取完整的语言名称?

windows - HH 的命令行选项和打开 CHM Windows 帮助文件