我正在写一个高效的套接字服务器。目的是获得良好的总体吞吐量。我将主线程用作侦听器。它async_accept
一个客户端,并将套接字添加到队列中。有一个调度程序线程从队列中拾取一个套接字,可以从中读取该套接字,并将其添加到工作线程的队列之一中。我保留了一组工作线程。一个工作线程将进行实际的读/写。
我在监听器中使用async_accept
。为了找出准备读取哪个套接字,我在调度程序中使用了async_read_some。这个想法可行,但是有问题。我的io_service.run()
在侦听器中被调用,因此调度程序中async_read_some
的处理程序实际上在侦听器线程中运行。
这是我的代码:
using boost::asio::ip::tcp;
using namespace std;
std::queue<std::shared_ptr<tcp::socket>> q_sock;
boost::mutex m_log1;
boost::condition_variable m_cond1;
boost::mutex::scoped_lock m_lock1 = boost::mutex::scoped_lock(m_log1);
sem_t _sem_sock;
enum { max_length1 = 1024 };
char data_1[max_length1];
void handle_read1(std::shared_ptr<tcp::socket> sock, const boost::system::error_code& error,
size_t bytes_transferred)
{
printf("handle_read1 : error : %s : %d, thread id is: %ld, pid : %d \n", error.category().name(), error.value(), (long int)syscall(SYS_gettid), getpid());
boost::asio::write(*(sock.get()), boost::asio::buffer(data_1, bytes_transferred));
}
void sock_dispatch() {
int v_size = 0;
std::shared_ptr<tcp::socket> curr_sock;
printf("sock_dispatch started. The ID of this of this thread is: %ld, pid : %d \n", (long int)syscall(SYS_gettid), getpid());
while(1) {
while(1) {
sem_wait(&_sem_sock);
v_size = q_sock.size();
sem_post(&_sem_sock);
if(v_size <= 0)
m_cond1.timed_wait(m_lock1,boost::posix_time::milliseconds(5000));
else
break;
}
sem_wait(&_sem_sock);
curr_sock = q_sock.front();
q_sock.pop();
sem_post(&_sem_sock);
curr_sock->async_read_some(boost::asio::buffer(data_1, max_length1),
boost::bind(handle_read1, curr_sock,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
class session
{
public:
session(boost::asio::io_service& io_service)
: sockptr(new tcp::socket(io_service)) {}
void start()
{
printf("START NEW SESSION The ID of this of this thread is: %ld, pid : %d \n", (long int)syscall(SYS_gettid), getpid());
sem_wait(&_sem_sock);
q_sock.push(sockptr);
sem_post(&_sem_sock);
m_cond1.notify_all();
}
std::shared_ptr<tcp::socket> sockptr;
};
class server
{
public:
server(boost::asio::io_service& io_service, short port)
: io_service_(io_service),
acceptor_(io_service, tcp::endpoint(tcp::v4(), port))
{
session* new_session = new session(io_service_);
acceptor_.async_accept(*(new_session->sockptr.get()),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
printf("WAITING TO ACCEPT: The ID of this of this thread is: %ld, pid : %d \n", (long int)syscall(SYS_gettid), getpid());
}
void handle_accept(session* new_session,
const boost::system::error_code& error)
{
new_session->start();
new_session = new session(io_service_);
acceptor_.async_accept(*(new_session->sockptr.get()),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
private:
boost::asio::io_service& io_service_;
tcp::acceptor acceptor_;
};
int main(int argc, char* argv[])
{
sem_init(&_sem_sock, 0, 1);
boost::asio::io_service io_service;
using namespace std;
server s(io_service, atoi(argv[1]));
boost::thread t(boost::bind(sock_dispatch));
io_service.run();
return 0;
}
该代码是从boost :: asio示例http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/example/echo/async_tcp_echo_server.cpp修改而来的。客户端代码为http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/example/echo/blocking_tcp_echo_client.cpp。
当客户端连接时,服务器的输出:
WAITING TO ACCEPT: The ID of this of this thread is: 3843, pid : 3843
sock_dispatch started. The ID of this of this thread is: 3844, pid : 3843
START NEW SESSION The ID of this of this thread is: 3843, pid : 3843
handle_read1 : error : system : 0, thread id is: 3843, pid : 3843
在这种情况下,调度程序线程ID为3944,但是handle_read1在线程3843中运行。
理想情况下,handle_read1应该在调度程序中运行,因此它不会在侦听器中阻止接受。
知道我该怎么做吗?还是有更好的整体设计:)?
最佳答案
如果需要在特定线程中调用特定处理程序,请使用不同的io_service
对象。例如,acceptor
可以用io_service1
构造,而套接字可以用io_service2
构造。然后,主线程可以执行io_service1.run()
,而线程池中的线程可以执行io_service2.run()
。
话虽如此,混合异步和同步功能可能会非常棘手。在我研究过的大多数异步程序中,几乎没有必要将线程专用于特定的异步链。
总的来说,我认为概念设计很好,但是我对实现有一些建议:q_sock
使用者代码和生产者代码是高层构造和较低层构造的组合。条件变量的使用有点不习惯,它引起了一个问题,即为什么使用sem_t
代替boost::mutex
并锁定。例如,以下消费者和生产者代码:
// Consumer
while(1)
{
sem_wait(&_sem_sock);
v_size = q_sock.size();
sem_post(&_sem_sock);
if (v_size <= 0)
m_cond1.timed_wait(m_lock1, boost::posix_time::milliseconds(5000));
else
break;
}
sem_wait(&_sem_sock);
curr_sock = q_sock.front();
q_sock.pop();
sem_post(&_sem_sock);
// Producer
sem_wait(&_sem_sock);
q_sock.push(sockptr);
sem_post(&_sem_sock);
m_cond1.notify_all();
可以不使用
sem_t
进行重写,并且基于Boost.Thread的condition_variable
文档更加惯用。考虑替代方案:// Consumer
boost::unique_lock<boost::mutex> lock(m_log1);
while (q_sock.empty())
{
m_cond1.wait(lock);
}
curr_sock = q_sock.front();
q_sock.pop();
lock.unlock();
// Producer
{
boost::lock_guard<boost::mutex> lock(m_log1);
q_sock.push(sockptr);
}
m_cond1.notify_all();
目前尚不清楚
session
提供什么功能。它似乎仅用作分配套接字并将其放入队列的一种手段。为什么不直接分配套接字并让调用者将其放入队列呢?
session::sockptr
是通过智能指针管理的,但session
不是。在不通过智能指针管理session
的情况下,server::handle_accept
中发生内存泄漏,因为session
的句柄在重新分配中丢失。确定
session
提供的功能,并围绕该功能设计界面。如果要提供封装,则非成员函数(例如
handle_read1
)可能需要成为成员函数。如果
session
有其自己的异步链,并将其自身提供给处理程序,则考虑使用enable_shared_from_this
。 Boost.Asio tutorial提供了一个示例用法,一些examples也是如此。目前,
async_read_some
并不表示已准备好读取哪个套接字。在ReadHandler
被调用时,数据已被读取。这是Proactor和Reactor之间的根本区别。如果需要Reactor样式操作,请使用
boost::asio::null_buffers
。有关更多详细信息,请参见this文档。但是,每种方法都有其后果。因此,至关重要的是要了解这些后果,以便做出最佳决策。通过Boost.Asio通过高级构造提供事件多路分解,
sock_dispatch
线程似乎不切实际。 session::start
成员函数可以在套接字上启动异步读取。这个微小的更改将消除对q_sock
的需要,并且消除了示例代码中的所有同步结构。检查为什么必须使用同步写入。如示例所示,在回显客户端的情况下,通常可以通过控制异步链自身的流以除去资源争用来使用异步写入。这允许每个连接具有其自己的缓冲区,该缓冲区可用于读取和写入。
不要预先优化。由于反向控制流,异步编程本来就更难调试。尝试针对吞吐量进行预优化只会加剧复杂性问题。程序运行后,执行吞吐量测试。如果结果不符合要求,则进行分析以确定瓶颈。以我的经验,大多数具有高吞吐量的服务器都将在不受CPU约束之前先受I / O约束。
关于c++ - boost::asio::async_read_some在父线程中运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14542859/