c# - C#多线程聊天服务器,处理断开连接

原文 标签 c# multithreading server client chat

我正在寻找一种处理断开连接的方法,因为每次关闭客户端时,服务器都会停止工作。我收到一条错误消息,在此行中“无法读取超出流的末尾”:

string message = reader.ReadString();


另外,我还需要一种从客户端列表中删除断开连接的客户端的方法。
这是我的代码:
服务器

using System;
using System.Threading;
using System.Net.Sockets;
using System.IO;
using System.Net;
using System.Collections.Generic;

namespace Server
{
    class Server
    {
    public static List<TcpClient> clients = new List<TcpClient>();

    static void Main(string[] args)
    {
        IPAddress ip = IPAddress.Parse("127.0.0.1");
        TcpListener ServerSocket = new TcpListener(ip, 14000);
        ServerSocket.Start();

        Console.WriteLine("Server started.");
        while (true)
        {
            TcpClient clientSocket = ServerSocket.AcceptTcpClient();
            clients.Add(clientSocket);
            handleClient client = new handleClient();
            client.startClient(clientSocket);
        }
    }
}

public class handleClient
{
    TcpClient clientSocket;
    public void startClient(TcpClient inClientSocket)
    {
        this.clientSocket = inClientSocket;
        Thread ctThread = new Thread(Chat);
        ctThread.Start();
    }

    private void Chat()
    {
        while (true)
        {
            BinaryReader reader = new BinaryReader(clientSocket.GetStream());
            while (true)
            {
                string message = reader.ReadString();
                foreach (var client in Server.clients)
                {
                    BinaryWriter writer = new BinaryWriter(client.GetStream());
                    writer.Write(message);
                }
            }
        }
    }
}
}


客户

using System;
using System.Net.Sockets;
using System.IO;
using System.Threading;

namespace Client
{
   class Client
   {
       public static void Write()
       {
        TcpClient client = new TcpClient("127.0.0.1", 14000);
        while (true)
        {
            string str = Console.ReadLine();
            BinaryWriter writer = new BinaryWriter(client.GetStream());
            writer.Write(str);
        }
    }

    public static void Read()
    {
        TcpClient client = new TcpClient("127.0.0.1", 14000);
        while (true)
        {
            BinaryReader reader = new BinaryReader(client.GetStream());
            Console.WriteLine(reader.ReadString());
        }
    }

    static void Main(string[] args)
    {
        Thread Thread = new Thread(Write);
        Thread Thread2 = new Thread(Read);
        Thread.Start();
        Thread2.Start();
    }
}
}

最佳答案

每次关闭客户端时,服务器都会停止工作。我收到一条错误消息,即“无法读取流的末尾”


从某种意义上说,这是完全正常的。也就是说,使用BinaryReader时,其正常行为是在到达流末尾时抛出EndOfStreamException

为什么它到达了流的末端?好吧,因为客户端断开了连接,这就是流发生的情况。在套接字级别,真正发生的是读取操作以0作为读取的字节数完成。这表明客户端已正常关闭套接字,并且将不再发送任何数据。

在.NET API中,这转换为NetworkStream的末尾,TcpClient用于包装实际上正在处理网络I / O的Socket对象。而该NetworkStream对象又被您的BinaryReader对象包装。当BinaryReader到达流的末尾时,将引发该异常。

请注意,您的代码实际上并未为用户提供关闭客户端的优美方法。他们将必须使用Ctrl + C或直接终止该进程。使用前者具有正常关闭套接字的偶然效果,但这仅是因为.NET正在处理进程的终止并在您的对象(例如用于连接到服务器的TcpClient对象)上运行终结器,以及终结器调用Socket.Shutdown()告知服务器正在关闭。

如果您要终止该进程(例如使用任务管理器),则会发现抛出了IOException。良好的网络代码应始终准备就绪以查看IOException;网络不可靠,确实会发生故障。您想做一些合理的事情,例如从连接中删除远程端点,而不只是使整个程序崩溃。

现在,所有这些,仅仅是因为EndOfStreamException是“正常的”,这并不意味着您发布的代码是,或者在任何方面都是进行网络编程的正确方法的示例。您有许多问题:


没有明确的优雅关闭。
网络I / O提供了一种关闭连接的常规方法,该方法包括在两个端点上进行握手以指示何时完成发送以及何时完成接收。一个端点将指示已完成发送;另一个会注意到这一点(使用上面提到的0字节读取),然后本身表明它已完成发送和接收。
TcpClientNetworkStream不会直接公开此内容,但是您可以使用TcpClient.Client属性来获取Socket对象以进行更好的平滑关闭,即一个端点可以表明它已完成发送,并且仍然能够等待另一个端点也完成发送。
使用TcpClient.Close()方法断开连接就像在不说“再见”的情况下挂断电话。使用Socket.Shutdown()就像礼貌地打完电话“好吧,这就是我想说的……还有其他吗?”
您正在使用BinaryReader,但未正确处理EndOfStreamException
您的客户端使用两个连接与服务器进行通信。
网络I / O使用Socket对象,该对象支持全双工通信。无需创建第二个连接即可进行读取和写入。单个连接就足够了,并且更好,因为当您将发送和接收分成两个连接时,您还需要向协议中添加一些内容,以便服务器知道这两个连接代表单个客户端(您的代码实际上并不会这样做) )。
客户端断开连接时,它不会从服务器列表中删除(您在问题中记下了此内容)。
客户端列表不是线程安全的。
您的Chat()方法中有一个额外的“ while(true)”。


我已经修改了您的原始示例,以解决上述所有问题,在此介绍了这些示例:

服务器Program.cs:

class Program
{
    private static readonly object _lock = new object();
    private static readonly List<TcpClient> clients = new List<TcpClient>();

    public static TcpClient[] GetClients()
    {
        lock (_lock) return clients.ToArray();
    }

    public static int GetClientCount()
    {
        lock (_lock) return clients.Count;
    }

    public static void RemoveClient(TcpClient client)
    {
        lock (_lock) clients.Remove(client);
    }

    static void Main(string[] args)
    {
        IPAddress ip = IPAddress.Parse("127.0.0.1");
        TcpListener ServerSocket = new TcpListener(ip, 14000);
        ServerSocket.Start();

        Console.WriteLine("Server started.");
        while (true)
        {
            TcpClient clientSocket = ServerSocket.AcceptTcpClient();
            Console.WriteLine($"client connected: {clientSocket.Client.RemoteEndPoint}");
            lock (_lock) clients.Add(clientSocket);
            handleClient client = new handleClient();
            client.startClient(clientSocket);

            Console.WriteLine($"{GetClientCount()} clients connected");
        }
    }
}


服务器handleClient.cs:

public class handleClient
{
    TcpClient clientSocket;

    public void startClient(TcpClient inClientSocket)
    {
        this.clientSocket = inClientSocket;
        Thread ctThread = new Thread(Chat);
        ctThread.Start();
    }

    private void Chat()
    {
        BinaryReader reader = new BinaryReader(clientSocket.GetStream());

        try
        {
            while (true)
            {
                string message = reader.ReadString();
                foreach (var client in Program.GetClients())
                {
                    BinaryWriter writer = new BinaryWriter(client.GetStream());
                    writer.Write(message);
                }
            }
        }
        catch (EndOfStreamException)
        {
            Console.WriteLine($"client disconnecting: {clientSocket.Client.RemoteEndPoint}");
            clientSocket.Client.Shutdown(SocketShutdown.Both);
        }
        catch (IOException e)
        {
            Console.WriteLine($"IOException reading from {clientSocket.Client.RemoteEndPoint}: {e.Message}");
        }

        clientSocket.Close();
        Program.RemoveClient(clientSocket);
        Console.WriteLine($"{Program.GetClientCount()} clients connected");
    }
}


客户端Program.cs:

class Program
{
    private static readonly object _lock = new object();
    private static bool _closed;

    public static void Write(TcpClient client)
    {
        try
        {
            string str;
            SocketShutdown reason = SocketShutdown.Send;

            while ((str = Console.ReadLine()) != "")
            {
                lock (_lock)
                {
                    BinaryWriter writer = new BinaryWriter(client.GetStream());
                    writer.Write(str);

                    if (_closed)
                    {
                        // Remote endpoint already said they are done sending,
                        // so we're done with both sending and receiving.
                        reason = SocketShutdown.Both;
                        break;
                    }
                }
            }

            client.Client.Shutdown(reason);
        }
        catch (IOException e)
        {
            Console.WriteLine($"IOException writing to socket: {e.Message}");
        }
    }

    public static void Read(TcpClient client)
    {
        try
        {
            while (true)
            {
                try
                {
                    BinaryReader reader = new BinaryReader(client.GetStream());
                    Console.WriteLine(reader.ReadString());
                }
                catch (EndOfStreamException)
                {
                    lock (_lock)
                    {
                        _closed = true;
                        return;
                    }
                }
            }
        }
        catch (IOException e)
        {
            Console.WriteLine($"IOException reading from socket: {e.Message}");
        }
    }

    static void Main(string[] args)
    {
        TcpClient client = new TcpClient("127.0.0.1", 14000);
        Thread writeThread = new Thread(() => Write(client));
        Thread readThread = new Thread(() => Read(client));
        writeThread.Start();
        readThread.Start();

        writeThread.Join();
        readThread.Join();

        client.Close();
        Console.WriteLine("client exiting");
    }
}


请注意,在大多数情况下,我没有解决您在代码中使用的不一致和非常规的命名。唯一的例外是客户端代码中的线程变量,因为我真的不喜欢与类型名称完全匹配的大写局部变量。

您还有其他一些问题,上面的代码修订版没有解决。这些包括:


您正在使用BinaryReader。这在很多方面都是令人讨厌的类。我建议,特别是对于无论如何仅处理文本的聊天服务器方案,都应切换为使用StreamReader / StreamWriter
存在不正确的耦合/关注点分离。您的Program类具有服务器代码,并且服务器代码了解Program类。最好将服务器和客户端实现都封装到自己的类中,与程序的主要入口点分开,并进一步将顶级服务器代码与每个客户端数据结构分离(使用C#的< cc>允许将重要事件通知顶级服务器代码,例如需要从列表中删除客户端,而不必让每个客户端数据结构必须真正了解顶级服务器对象,请注意其客户清单)。
您应该提供一种机制来正常关闭服务器。


通常,我会说这些不在这样的答案范围之内,答案已经很长了。我已经解决了您的代码中紧迫的问题,然后再解决了一些问题,这名义上就足够了。

但是,我一直想写几年前写的basic network programming示例的更新版本,作为一种“中间”示例,添加了多个客户端支持,异步操作并使用了最新的C#功能(如event / async)。因此,我继续并花了一些时间来做到这一点。我想我最终会将它发布到我的博客中……这是另一个项目。同时,这就是该代码(请注意,这是一个完全从头开始的示例……这样做比尝试重新编写您的代码更有意义)……

此实现的大部分繁琐工作都在服务器和客户端共享的单个类中:

/// <summary>
/// Represents a remote end-point for the chat server and clients
/// </summary>
public sealed class ConnectedEndPoint : IDisposable
{
    private readonly object _lock = new object();
    private readonly Socket _socket;
    private readonly StreamReader _reader;
    private readonly StreamWriter _writer;
    private bool _closing;

    /// <summary>
    /// Gets the address of the connected remote end-point
    /// </summary>
    public IPEndPoint RemoteEndPoint { get { return (IPEndPoint)_socket.RemoteEndPoint; } }

    /// <summary>
    /// Gets a <see cref="Task"/> representing the on-going read operation of the connection
    /// </summary>
    public Task ReadTask { get; }

    /// <summary>
    /// Connect to an existing remote end-point (server) and return the
    /// <see cref="ConnectedEndPoint"/> object representing the new connection
    /// </summary>
    /// <param name="remoteEndPoint">The address of the remote end-point to connect to</param>
    /// <param name="readCallback">The callback which will be called when a line of text is read from the newly-created connection</param>
    /// <returns></returns>
    public static ConnectedEndPoint Connect(IPEndPoint remoteEndPoint, Action<ConnectedEndPoint, string> readCallback)
    {
        Socket socket = new Socket(SocketType.Stream, ProtocolType.Tcp);

        socket.Connect(remoteEndPoint);

        return new ConnectedEndPoint(socket, readCallback);
    }

    /// <summary>
    /// Asynchronously accept a new connection from a remote end-point
    /// </summary>
    /// <param name="listener">The listening <see cref="Socket"/> which will accept the connection</param>
    /// <param name="readCallback">The callback which will be called when a line of text is read from the newly-created connection</param>
    /// <returns></returns>
    public static async Task<ConnectedEndPoint> AcceptAsync(Socket listener, Action<ConnectedEndPoint, string> readCallback)
    {
        Socket clientSocket = await Task.Factory.FromAsync(listener.BeginAccept, listener.EndAccept, null);

        return new ConnectedEndPoint(clientSocket, readCallback);
    }

    /// <summary>
    /// Write a line of text to the connection, sending it to the remote end-point
    /// </summary>
    /// <param name="text">The line of text to write</param>
    public void WriteLine(string text)
    {
        lock (_lock)
        {
            if (!_closing)
            {
                _writer.WriteLine(text);
                _writer.Flush();
            }
        }
    }

    /// <summary>
    /// Initiates a graceful closure of the connection
    /// </summary>
    public void Shutdown()
    {
        _Shutdown(SocketShutdown.Send);
    }

    /// <summary>
    /// Implements <see cref="IDisposable.Dispose"/>
    /// </summary>
    public void Dispose()
    {
        _reader.Dispose();
        _writer.Dispose();
        _socket.Close();
    }

    /// <summary>
    /// Constructor. Private -- use one of the factory methods to create new connections.
    /// </summary>
    /// <param name="socket">The <see cref="Socket"/> for the new connection</param>
    /// <param name="readCallback">The callback for reading lines on the new connection</param>
    private ConnectedEndPoint(Socket socket, Action<ConnectedEndPoint, string> readCallback)
    {
        _socket = socket;
        Stream stream = new NetworkStream(_socket);
        _reader = new StreamReader(stream, Encoding.UTF8, false, 1024, true);
        _writer = new StreamWriter(stream, Encoding.UTF8, 1024, true);

        ReadTask = _ConsumeSocketAsync(readCallback);
    }

    private void _Shutdown(SocketShutdown reason)
    {
        lock (_lock)
        {
            if (!_closing)
            {
                _socket.Shutdown(reason);
                _closing = true;
            }
        }
    }

    private async Task _ConsumeSocketAsync(Action<ConnectedEndPoint, string> callback)
    {
        string line;

        while ((line = await _reader.ReadLineAsync()) != null)
        {
            callback(this, line);
        }

        _Shutdown(SocketShutdown.Both);
    }
}


客户程序将直接使用该类。服务器端封装在另一个类中,该类与上述相同的DLL中存在:

/// <summary>
/// Event arguments for the <see cref="ChatServer.Status"/> event
/// </summary>
public class StatusEventArgs : EventArgs
{
    /// <summary>
    /// Gets the status text
    /// </summary>
    public string StatusText { get; }

    /// <summary>
    /// Constructor
    /// </summary>
    /// <param name="statusText">The status text</param>
    public StatusEventArgs(string statusText)
    {
        StatusText = statusText;
    }
}

/// <summary>
/// A server implementing a simple line-based chat server
/// </summary>
public class ChatServer
{
    private readonly object _lock = new object();
    private readonly Socket _listener;
    private readonly List<ConnectedEndPoint> _clients = new List<ConnectedEndPoint>();
    private bool _closing;

    /// <summary>
    /// Gets a task representing the listening state of the servdere
    /// </summary>
    public Task ListenTask { get; }

    /// <summary>
    /// Raised when the server has status to report
    /// </summary>
    public event EventHandler<StatusEventArgs> Status;

    /// <summary>
    /// Constructor
    /// </summary>
    /// <param name="port">The port number the server should listen on</param>
    public ChatServer(int port)
    {
        _listener = new Socket(SocketType.Stream, ProtocolType.Tcp);
        _listener.Bind(new IPEndPoint(IPAddress.Any, port));
        _listener.Listen(int.MaxValue);
        ListenTask = _ListenAsync();
    }

    /// <summary>
    /// Initiates a shutdown of the chat server.
    /// </summary>
    /// <remarks>This method closes the listening socket, which will subsequently
    /// cause the listening task to inform any connected clients that the server
    /// is shutting down, and to wait for the connected clients to finish a graceful
    /// closure of their connections.
    /// </remarks>
    public void Shutdown()
    {
        _listener.Close();
    }

    private async Task _ListenAsync()
    {
        try
        {
            while (true)
            {
                ConnectedEndPoint client = await ConnectedEndPoint.AcceptAsync(_listener, _ClientReadLine);

                _AddClient(client);
                _CleanupClientAsync(client);
            }
        }
        catch (ObjectDisposedException)
        {
            _OnStatus("Server's listening socket closed");
        }
        catch (IOException e)
        {
            _OnStatus($"Listening socket IOException: {e.Message}");
        }

        await _CleanupServerAsync();
    }

    private async Task _CleanupServerAsync()
    {
        ConnectedEndPoint[] clients;

        lock (_lock)
        {
            _closing = true;
            clients = _clients.ToArray();
        }

        foreach (ConnectedEndPoint client in clients)
        {
            try
            {
                client.WriteLine("Chat server is shutting down");
            }
            catch (IOException e)
            {
                _OnClientException(client, e.Message);
            }
            client.Shutdown();
        }

        // Clients are expected to participate in graceful closure. If they do,
        // this will complete when all clients have acknowledged the shutdown.
        // In a real-world program, may be a good idea to include a timeout in
        // case of network issues or misbehaving/crashed clients. Implementing
        // the timeout is beyond the scope of this proof-of-concept demo code.
        try
        {
            await Task.WhenAll(clients.Select(c => c.ReadTask));
        }
        catch (AggregateException)
        {
            // Actual exception for each client will have already
            // been reported by _CleanupClientAsync()
        }
    }

    // Top-level "clean-up" method, which will observe and report all exceptions
    // In real-world code, would probably want to simply log any unexpected exceptions
    // to a log file and then exit the process. Here, we just exit after reporting
    // exception info to caller. In either case, there's no need to observe a Task from
    // this method, and async void simplifies the call (no need to receive and then ignore
    // the Task object just to keep the compiler quiet).
    private async void _CleanupClientAsync(ConnectedEndPoint client)
    {
        try
        {
            await client.ReadTask;
        }
        catch (IOException e)
        {
            _OnClientException(client, e.Message);
        }
        catch (Exception e)
        {
            // Unexpected exceptions are programmer-error. They could be anything, and leave
            // the program in an unknown, possibly corrupt state. The only reasonable disposition
            // is to log, then exit.
            //
            // Full stack-trace, because who knows what this exception was. Will need the
            // stack-trace to do any diagnostic work.
            _OnStatus($"Unexpected client connection exception. {e}");
            Environment.Exit(1);
        }
        finally
        {
            _RemoveClient(client);
            client.Dispose();
        }
    }

    private void _ClientReadLine(ConnectedEndPoint readClient, string text)
    {
        _OnStatus($"Client {readClient.RemoteEndPoint}: \"{text}\"");

        lock (_lock)
        {
            if (_closing)
            {
                return;
            }

            text = $"{readClient.RemoteEndPoint}: {text}";

            foreach (ConnectedEndPoint client in _clients.Where(c => c != readClient))
            {
                try
                {
                    client.WriteLine(text);
                }
                catch (IOException e)
                {
                    _OnClientException(client, e.Message);
                }
            }
        }
    }

    private void _AddClient(ConnectedEndPoint client)
    {
        lock (_lock)
        {
            _clients.Add(client);
            _OnStatus($"added client {client.RemoteEndPoint} -- {_clients.Count} clients connected");
        }
    }

    private void _RemoveClient(ConnectedEndPoint client)
    {
        lock (_lock)
        {
            _clients.Remove(client);
            _OnStatus($"removed client {client.RemoteEndPoint} -- {_clients.Count} clients connected");
        }
    }

    private void _OnStatus(string statusText)
    {
        Status?.Invoke(this, new StatusEventArgs(statusText));
    }

    private void _OnClientException(ConnectedEndPoint client, string message)
    {
        _OnStatus($"Client {client.RemoteEndPoint} IOException: {message}");
    }
}


而且,在大多数情况下,这就是您所需要的。上面的DLL代码由两个不同的程序(服务器和客户端)引用(在我的示例中)。

这是服务器:

class Program
{
    private const int _kportNumber = 5678;

    static void Main(string[] args)
    {
        ChatServer server = new ChatServer(_kportNumber);

        server.Status += (s, e) => WriteLine(e.StatusText);

        Task serverTask = _WaitForServer(server);

        WriteLine("Press return to shutdown server...");
        ReadLine();

        server.Shutdown();
        serverTask.Wait();
    }

    private static async Task _WaitForServer(ChatServer server)
    {
        try
        {
            await server.ListenTask;
        }
        catch (Exception e)
        {
            WriteLine($"Server exception: {e}");
        }
    }
}


这是客户:

class Program
{
    private const int _kportNumber = 5678;

    static void Main(string[] args)
    {
        IPEndPoint remoteEndPoint = new IPEndPoint(IPAddress.Loopback, _kportNumber);
        ConnectedEndPoint server = ConnectedEndPoint.Connect(remoteEndPoint, (c, s) => WriteLine(s));

        _StartUserInput(server);
        _SafeWaitOnServerRead(server).Wait();
    }

    private static void _StartUserInput(ConnectedEndPoint server)
    {
        // Get user input in a new thread, so main thread can handle waiting
        // on connection.
        new Thread(() =>
        {
            try
            {
                string line;

                while ((line = ReadLine()) != "")
                {
                    server.WriteLine(line);
                }

                server.Shutdown();
            }
            catch (IOException e)
            {
                WriteLine($"Server {server.RemoteEndPoint} IOException: {e.Message}");
            }
            catch (Exception e)
            {
                WriteLine($"Unexpected server exception: {e}");
                Environment.Exit(1);
            }
        })
        {
            // Setting IsBackground means this thread won't keep the
            // process alive. So, if the connection is closed by the server,
            // the main thread can exit and the process as a whole will still
            // be able to exit.
            IsBackground = true
        }.Start();
    }

    private static async Task _SafeWaitOnServerRead(ConnectedEndPoint server)
    {
        try
        {
            await server.ReadTask;
        }
        catch (IOException e)
        {
            WriteLine($"Server {server.RemoteEndPoint} IOException: {e.Message}");
        }
        catch (Exception e)
        {
            // Should never happen. It's a bug in this code if it does.
            WriteLine($"Unexpected server exception: {e}");
        }
    }
}


在我看来,上面要提醒您的最重要的事情之一是awaitConnectedEndPoint类对使用它们的类具有零依赖性。通过使用回调委托和事件,依赖于这些类的代码可以双向交互,而无需这些支持类就不必知道代码所驻留的类型(请参阅“控制反转”,这是一个变体)的)。

使代码关系看起来像只有单向引用的树的内容越多,编写代码以及以后维护代码的难度就越大。

注意:为了说明起见,我同时使用了事件和回调委托。两种方法都可以很好地发挥作用。主要的权衡是复杂性与灵活性。使用事件使代码更加灵活-可以根据需要添加和删除事件处理程序-但是,如果使用带有ChatServersender参数的方法签名的.NET约定来实现事件,则它“比较繁琐” -weight”,而不是在创建相关对象时仅传递一个简单的回调委托。我在代码中放置了每个示例,您可以决定在哪种情况下首选哪种方法。

您还将注意到,以上内容大量使用了C#的异步功能。起初,这可能会使代码看起来更难阅读。但实际上,使用这些功能使所有功能正常工作要比我尝试使用较旧的EventArgs / BeginXXX()方法容易得多,或者,如果禁止,请为每个连接专用一个线程(这非常可扩展)随着客户人数的增加而变差)。用这种方式来思考固有的异步操作(例如网络I / O)绝对值得。

相关文章:

python - 在python本地主机服务器上进行测试会将我引导到目录

c# - 使用存储库以工作模式为单位的.Net依赖项注入(inject)

c# - 为什么COM Interop将VB6 bool(boolean) 值视为C#短?

c# - NUnit。值在索引[0]处不同

c++ - 在OpenMP中加入阵列结果

javascript - node.js服务于可访问node.js服务器上资源的网页

c# - Visual Studio Code Ubuntu-在标记输入时,模式失败

python - 如何让这个线程等待队列退出?

multithreading - 最佳编程方法/方法以确保线程安全

c - 客户端断开连接后是否可以从客户端套接字读取?