c++ - 使用 boost::asio::streambuf 缺少第一个字符

标签 c++ boost server

我使用 boost 库用 C++ 制作了一个异步聊天服务器。几乎一切都工作正常。

客户端断开连接有两种方法:

  • 按 Ctrl + C(终止客户端进程)
  • 输入“退出”。

前者还可以。然而,后者有一个问题。如果客户端通过“退出”断开连接,则由另一个客户端发送的下一条消息将显示,而不会出现前几个字符。之后就OK了。

例如:几个客户聊天。其中之一以“退出”断开连接。之后,另一个客户端发送“0123456789abcdefghijk”,所有客户端仅收到:“abcdefghijk”。我不知道问题出在哪里,我猜是streambuf的问题。我发现了类似的问题(几乎相同),但是是在 C# 中。

代码如下:

#include<iostream>
#include<list>
#include<map>
#include<queue>
#include<vector>
#include<cstdlib>
#include<ctime>

#include<boost/thread.hpp>
#include<boost/bind.hpp>
#include<boost/asio.hpp>
#include<boost/asio/ip/tcp.hpp>

using namespace std;
using namespace boost::asio;
using namespace boost::asio::ip;

typedef boost::shared_ptr<tcp::socket> socket_ptr;
typedef boost::shared_ptr<string> string_ptr;
typedef boost::shared_ptr< list<socket_ptr> > clientList_ptr;
typedef boost::shared_ptr< list<string> > nameList_ptr;

const string waitingMsg("Waiting for clients...\n");
const string totalClientsMsg("Total clients: ");
const string errorReadingMsg("Error on reading: ");
const string errorWritingMsg("Error on writing: ");
const int EOF_ERROR_CODE = 2;
const int THREADS = 1;

io_service service;
tcp::acceptor acceptor(service, tcp::endpoint(tcp::v4(), 30001));
boost::mutex mtx;
clientList_ptr clientList(new list<socket_ptr>);
nameList_ptr nameList(new list<string>);
boost::asio::streambuf buff;
time_t timer;

void ready();
void accepting();
void askForName(socket_ptr clientSock, const boost::system::error_code& error);
void receiveName(socket_ptr clientSock, const boost::system::error_code& error,
        std::size_t bytes_transferred);
void identify(socket_ptr clientSock, const boost::system::error_code& error, std::size_t bytes_transferred);
void accepted(socket_ptr clientSock, string_ptr name);
void receiveMessage(socket_ptr clientSock, string_ptr name);
void received(socket_ptr clientSock, string_ptr name, const boost::system::error_code& error,
        std::size_t bytes_transferred);
bool extract(string_ptr message, std::size_t bytes_transferred);
bool clientSentExit(string_ptr clientSock);
void disconnectClient(socket_ptr clientSock, string_ptr name, const boost::system::error_code& error);
void writeMessage(socket_ptr clientSock, string_ptr message);
void responseSent(const boost::system::error_code& error);
void notification(socket_ptr sock, string_ptr name, const string headOfMsg, const string tailOfMsg);


int main(int argc, char* argv[])
{
    try
    {
        vector<boost::shared_ptr<boost::thread> > threads;

        ready();
        for (int i = 0; i < THREADS; i++)
        {
            boost::shared_ptr <boost::thread> t(new boost::thread(boost::bind(&io_service::run, &service)));
            threads.push_back(t);
        }
        for (int i = 0; i < THREADS; i++)
        {
            threads[i]->join();
        }
    }
    catch (std::exception& error)
    {
        cerr << error.what() << endl;
    }
    return 0;
}

void ready()
{
    cout << waitingMsg;
    accepting();
}

void accepting()
{
    socket_ptr clientSock(new tcp::socket(service));
    acceptor.async_accept(*clientSock, boost::bind(&askForName, clientSock, boost::asio::placeholders::error));
}

void askForName(socket_ptr sock, const boost::system::error_code& error)
{
    if (error)
    {
        cerr << "Error on accepting: " << error.message() << endl;
    }
    boost::asio::async_write(*sock, buffer("Please, enter your name:\n"),
            boost::bind(&receiveName, sock, boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));

    accepting();
}

void receiveName(socket_ptr sock, const boost::system::error_code& error,
        std::size_t bytes_transferred)
{
    if (error)
    {
        cerr << errorWritingMsg << error.message() << endl;
    }
    boost::asio::async_read_until(*sock, buff, '\n',
            boost::bind(&identify, sock, boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
}

void identify(socket_ptr sock, const boost::system::error_code& error,
        std::size_t bytes_transferred)
{
    if(error)
    {
        if (error.value() != EOF_ERROR_CODE)
        {
            cerr << errorReadingMsg << error.message() << endl;
        }
        return;
    }

    string_ptr name(new string(""));
    if (!extract(name, bytes_transferred))
    {
        return;
    }

    if (find(nameList->begin(), nameList->end(), *name) != nameList->end())
    {
        boost::asio::async_write(*sock, buffer("This name is already in use! Please, select another name:\n"),
                    boost::bind(&receiveName, sock, boost::asio::placeholders::error,
                            boost::asio::placeholders::bytes_transferred));
        return;
    }

    nameList->emplace_back(*name);

    accepted(sock, name);
}

void accepted(socket_ptr sock, string_ptr name)
{
    mtx.lock();
    clientList->emplace_back(sock);
    mtx.unlock();
    notification(sock, name, "New client: ", " joined. ");

    receiveMessage(sock, name);
}

void receiveMessage(socket_ptr sock, string_ptr name)

{
    boost::asio::async_read_until(*sock, buff, '\n', boost::bind(&received, sock, name, boost::asio::placeholders::error,
              boost::asio::placeholders::bytes_transferred));
}

void received(socket_ptr sock, string_ptr name, const boost::system::error_code& error,
        std::size_t bytes_transferred)
{
    if(error)
    {
        if (error.value() != EOF_ERROR_CODE)
        {
            cerr << errorReadingMsg << error.message() << endl;
        }
        disconnectClient(sock, name, error);
        return;
    }

    if(!clientList->empty())
    {
        //mtx.lock();

        string_ptr message(new string(""));

        if(!extract(message, bytes_transferred))
        {
            //mtx.unlock();
            disconnectClient(sock, name, error);
            return;
        }

        *message = *name + ": " + *message + "\n";
        cout << "ChatLog: " << *message << endl;

        writeMessage(sock, message);
        receiveMessage(sock, name);

        //mtx.unlock();
    }
}

bool extract(string_ptr message, std::size_t bytes_transferred)
{
    mtx.lock();
    buff.commit(bytes_transferred);
    std::istream istrm(&buff);
    //mtx.unlock();

    std::getline(istrm, *message);
    buff.consume(buff.size());

    string_ptr msgEndl(new string(*message + "\n"));
    mtx.unlock();
    if(clientSentExit(msgEndl))
    {
        return false;
    }
    return true;
}

bool clientSentExit(string_ptr message)
{
    return message->compare(0, 5, "exit\n") == 0;
}

void disconnectClient(socket_ptr sock, string_ptr name, const boost::system::error_code& error)
{
    boost::system::error_code ec = error;
    auto position = find(clientList->begin(), clientList->end(), sock);
    auto namePos = find(nameList->begin(), nameList->end(), *name);

    sock->shutdown(tcp::socket::shutdown_both, ec);
    if (ec)
    {
        cerr << "Error on shutting: " << ec.message() << endl;
    }
    sock->close(ec);
    if(ec)
    {
        cerr << "Error on closing: " << ec.message() << endl;
    }
    clientList->erase(position);
    nameList->erase(namePos);

    notification(sock, name, "", " disconnected. ");
}

void writeMessage(socket_ptr sock, string_ptr message)
{
    for(auto& cliSock : *clientList)
    {
        if (cliSock->is_open() && cliSock != sock)
        {
            boost::asio::async_write(*cliSock, buffer(*message),
                    boost::bind(&responseSent, boost::asio::placeholders::error));
        }
    }
}

void responseSent(const boost::system::error_code& error)
{
    if (error)
    {
        cerr << "Error on writing: " << error.message() << endl;
    }
}

void notification(socket_ptr sock, string_ptr name, const string headOfMsg, const string tailOfMsg)
{
    string_ptr serviceMsg (new string (headOfMsg + *name + tailOfMsg));
    cout << *serviceMsg << totalClientsMsg << clientList->size() << endl;

    *serviceMsg = *serviceMsg + "\n";
    writeMessage(sock, serviceMsg);

    cout << waitingMsg;
}

有趣的是,我有类似的同步服务器,具有相同的streambuf使用方式,但没有这样的问题。

最佳答案

boost::asio::async_read_until() 可以在\n 之后读取任意数量的字符到 Streambuf。然后它给出 bytes_transferred,它是第一行中的字符数(不一定是读取到缓冲区的字符数)。请参阅documentation .

只要保持缓冲区变量不变,接下来 boost::asio::async_read_until() 将首先从缓冲区读取字符,然后从套接字读取字符。

在我看来,您使用 getline() 从缓冲区读取了一行,这是正确的。之后,您调用

buff.consume(buff.size());

这会清除缓冲区,删除有关您可能收到的部分行的所有信息。您读取的第一个完整行已被 getline() 从缓冲区中删除,因此在任何情况下都不需要调用 Consumer()。

仅删除 Consumer() 调用并不能解决您的问题,因为您似乎使用在所有客户端之间共享的一个缓冲区,并且您不知道哪些部分数据来自哪个客户端。一种可能的解决方案是创建一个缓冲区列表(每个客户端一个),就像您有一个套接字列表一样。然后 boost::asio::async_read_until() 和 getline() 将负责处理部分数据,您不必考虑这一点。

关于c++ - 使用 boost::asio::streambuf 缺少第一个字符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33629567/

相关文章:

swift - 使用 Swift 将图像上传到服务器时出错

java - 使用java的视频服务器

c++ - 应用程序不同部分的不同语言

c++ - C++ 中的 Bron Kerbosch 算法

c++ - boost::regex 中的嵌套量词

c++ - 如何在 boost 累加器中使用/访问用户参数?

c++ - Boost.asio async_read_some编译错误

c++ - 使用 epoll 处理多个并发请求

c++ - sfml 编译错误 : NULL was not declared in this scope

c++ - 为什么我的空间哈希这么慢?