我使用 boost 库用 C++ 制作了一个异步聊天服务器。几乎一切都工作正常。
客户端断开连接有两种方法:
- 按 Ctrl + C(终止客户端进程)
- 输入“退出”。
前者还可以。然而,后者有一个问题。如果客户端通过“退出”断开连接,则由另一个客户端发送的下一条消息将显示,而不会出现前几个字符。之后就OK了。
例如:几个客户聊天。其中之一以“退出”断开连接。之后,另一个客户端发送“0123456789abcdefghijk”,所有客户端仅收到:“abcdefghijk”。我不知道问题出在哪里,我猜是streambuf的问题。我发现了类似的问题(几乎相同),但是是在 C# 中。
代码如下:
#include<iostream>
#include<list>
#include<map>
#include<queue>
#include<vector>
#include<cstdlib>
#include<ctime>
#include<boost/thread.hpp>
#include<boost/bind.hpp>
#include<boost/asio.hpp>
#include<boost/asio/ip/tcp.hpp>
using namespace std;
using namespace boost::asio;
using namespace boost::asio::ip;
typedef boost::shared_ptr<tcp::socket> socket_ptr;
typedef boost::shared_ptr<string> string_ptr;
typedef boost::shared_ptr< list<socket_ptr> > clientList_ptr;
typedef boost::shared_ptr< list<string> > nameList_ptr;
const string waitingMsg("Waiting for clients...\n");
const string totalClientsMsg("Total clients: ");
const string errorReadingMsg("Error on reading: ");
const string errorWritingMsg("Error on writing: ");
const int EOF_ERROR_CODE = 2;
const int THREADS = 1;
io_service service;
tcp::acceptor acceptor(service, tcp::endpoint(tcp::v4(), 30001));
boost::mutex mtx;
clientList_ptr clientList(new list<socket_ptr>);
nameList_ptr nameList(new list<string>);
boost::asio::streambuf buff;
time_t timer;
void ready();
void accepting();
void askForName(socket_ptr clientSock, const boost::system::error_code& error);
void receiveName(socket_ptr clientSock, const boost::system::error_code& error,
std::size_t bytes_transferred);
void identify(socket_ptr clientSock, const boost::system::error_code& error, std::size_t bytes_transferred);
void accepted(socket_ptr clientSock, string_ptr name);
void receiveMessage(socket_ptr clientSock, string_ptr name);
void received(socket_ptr clientSock, string_ptr name, const boost::system::error_code& error,
std::size_t bytes_transferred);
bool extract(string_ptr message, std::size_t bytes_transferred);
bool clientSentExit(string_ptr clientSock);
void disconnectClient(socket_ptr clientSock, string_ptr name, const boost::system::error_code& error);
void writeMessage(socket_ptr clientSock, string_ptr message);
void responseSent(const boost::system::error_code& error);
void notification(socket_ptr sock, string_ptr name, const string headOfMsg, const string tailOfMsg);
int main(int argc, char* argv[])
{
try
{
vector<boost::shared_ptr<boost::thread> > threads;
ready();
for (int i = 0; i < THREADS; i++)
{
boost::shared_ptr <boost::thread> t(new boost::thread(boost::bind(&io_service::run, &service)));
threads.push_back(t);
}
for (int i = 0; i < THREADS; i++)
{
threads[i]->join();
}
}
catch (std::exception& error)
{
cerr << error.what() << endl;
}
return 0;
}
void ready()
{
cout << waitingMsg;
accepting();
}
void accepting()
{
socket_ptr clientSock(new tcp::socket(service));
acceptor.async_accept(*clientSock, boost::bind(&askForName, clientSock, boost::asio::placeholders::error));
}
void askForName(socket_ptr sock, const boost::system::error_code& error)
{
if (error)
{
cerr << "Error on accepting: " << error.message() << endl;
}
boost::asio::async_write(*sock, buffer("Please, enter your name:\n"),
boost::bind(&receiveName, sock, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
accepting();
}
void receiveName(socket_ptr sock, const boost::system::error_code& error,
std::size_t bytes_transferred)
{
if (error)
{
cerr << errorWritingMsg << error.message() << endl;
}
boost::asio::async_read_until(*sock, buff, '\n',
boost::bind(&identify, sock, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void identify(socket_ptr sock, const boost::system::error_code& error,
std::size_t bytes_transferred)
{
if(error)
{
if (error.value() != EOF_ERROR_CODE)
{
cerr << errorReadingMsg << error.message() << endl;
}
return;
}
string_ptr name(new string(""));
if (!extract(name, bytes_transferred))
{
return;
}
if (find(nameList->begin(), nameList->end(), *name) != nameList->end())
{
boost::asio::async_write(*sock, buffer("This name is already in use! Please, select another name:\n"),
boost::bind(&receiveName, sock, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
return;
}
nameList->emplace_back(*name);
accepted(sock, name);
}
void accepted(socket_ptr sock, string_ptr name)
{
mtx.lock();
clientList->emplace_back(sock);
mtx.unlock();
notification(sock, name, "New client: ", " joined. ");
receiveMessage(sock, name);
}
void receiveMessage(socket_ptr sock, string_ptr name)
{
boost::asio::async_read_until(*sock, buff, '\n', boost::bind(&received, sock, name, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void received(socket_ptr sock, string_ptr name, const boost::system::error_code& error,
std::size_t bytes_transferred)
{
if(error)
{
if (error.value() != EOF_ERROR_CODE)
{
cerr << errorReadingMsg << error.message() << endl;
}
disconnectClient(sock, name, error);
return;
}
if(!clientList->empty())
{
//mtx.lock();
string_ptr message(new string(""));
if(!extract(message, bytes_transferred))
{
//mtx.unlock();
disconnectClient(sock, name, error);
return;
}
*message = *name + ": " + *message + "\n";
cout << "ChatLog: " << *message << endl;
writeMessage(sock, message);
receiveMessage(sock, name);
//mtx.unlock();
}
}
bool extract(string_ptr message, std::size_t bytes_transferred)
{
mtx.lock();
buff.commit(bytes_transferred);
std::istream istrm(&buff);
//mtx.unlock();
std::getline(istrm, *message);
buff.consume(buff.size());
string_ptr msgEndl(new string(*message + "\n"));
mtx.unlock();
if(clientSentExit(msgEndl))
{
return false;
}
return true;
}
bool clientSentExit(string_ptr message)
{
return message->compare(0, 5, "exit\n") == 0;
}
void disconnectClient(socket_ptr sock, string_ptr name, const boost::system::error_code& error)
{
boost::system::error_code ec = error;
auto position = find(clientList->begin(), clientList->end(), sock);
auto namePos = find(nameList->begin(), nameList->end(), *name);
sock->shutdown(tcp::socket::shutdown_both, ec);
if (ec)
{
cerr << "Error on shutting: " << ec.message() << endl;
}
sock->close(ec);
if(ec)
{
cerr << "Error on closing: " << ec.message() << endl;
}
clientList->erase(position);
nameList->erase(namePos);
notification(sock, name, "", " disconnected. ");
}
void writeMessage(socket_ptr sock, string_ptr message)
{
for(auto& cliSock : *clientList)
{
if (cliSock->is_open() && cliSock != sock)
{
boost::asio::async_write(*cliSock, buffer(*message),
boost::bind(&responseSent, boost::asio::placeholders::error));
}
}
}
void responseSent(const boost::system::error_code& error)
{
if (error)
{
cerr << "Error on writing: " << error.message() << endl;
}
}
void notification(socket_ptr sock, string_ptr name, const string headOfMsg, const string tailOfMsg)
{
string_ptr serviceMsg (new string (headOfMsg + *name + tailOfMsg));
cout << *serviceMsg << totalClientsMsg << clientList->size() << endl;
*serviceMsg = *serviceMsg + "\n";
writeMessage(sock, serviceMsg);
cout << waitingMsg;
}
有趣的是,我有类似的同步服务器,具有相同的streambuf使用方式,但没有这样的问题。
最佳答案
boost::asio::async_read_until() 可以在\n 之后读取任意数量的字符到 Streambuf。然后它给出 bytes_transferred,它是第一行中的字符数(不一定是读取到缓冲区的字符数)。请参阅documentation .
只要保持缓冲区变量不变,接下来 boost::asio::async_read_until() 将首先从缓冲区读取字符,然后从套接字读取字符。
在我看来,您使用 getline() 从缓冲区读取了一行,这是正确的。之后,您调用
buff.consume(buff.size());
这会清除缓冲区,删除有关您可能收到的部分行的所有信息。您读取的第一个完整行已被 getline() 从缓冲区中删除,因此在任何情况下都不需要调用 Consumer()。
仅删除 Consumer() 调用并不能解决您的问题,因为您似乎使用在所有客户端之间共享的一个缓冲区,并且您不知道哪些部分数据来自哪个客户端。一种可能的解决方案是创建一个缓冲区列表(每个客户端一个),就像您有一个套接字列表一样。然后 boost::asio::async_read_until() 和 getline() 将负责处理部分数据,您不必考虑这一点。
关于c++ - 使用 boost::asio::streambuf 缺少第一个字符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33629567/