c++ - boost::asio async_receive_from UDP 端点在线程之间共享?

标签 c++ multithreading boost udp boost-asio

Boost asio 专门允许多个线程调用 io_service 上的 run() 方法。这似乎是创建多线程 UDP 服务器的好方法。但是,我遇到了一个问题,我正在努力寻找答案。

查看典型的 async_receive_from 调用:

m_socket->async_receive_from(
        boost::asio::buffer(m_recv_buffer),
        m_remote_endpoint,
        boost::bind(
            &udp_server::handle_receive,
            this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));

远程端点和消息缓冲区没有传递给处理程序,而是处于更高的范围级别(在我的示例中为成员变量)。在 UDP 消息到达时处理它的代码如下所示:

void dns_server::handle_receive(const boost::system::error_code &error, std::size_t size)
{
    // process message
    blah(m_recv_buffer, size);

    // send something back
    respond(m_remote_endpoint);
}

如果有多个线程在运行,同步是如何进行的?在线程之间共享一个端点和接收缓冲区意味着 asio 在消息同时到达的情况下等待处理程序在单个线程中完成,然后再在另一个线程中调用处理程序。这似乎否定了允许多个线程首先调用 run 的意义。

如果我想获得请求的并发服务,看起来我需要将工作数据包连同端点的拷贝交给一个单独的线程,允许处理程序方法立即返回,以便 asio 可以获取on 并将另一条消息并行传递给另一个调用 run() 的线程。

这似乎有点令人讨厌。我在这里缺少什么?

最佳答案

Having a single end point and receive buffer shared between the threads implies that asio waits for a handler to complete within a single thread

如果您的意思是“使用单线程运行服务时”,那么这是正确的。

否则,情况并非如此。相反,当您同时调用单个服务对象(即套接字,而不是 io_service)上的操作时,Asio 只是说行为是“未定义的”。

That seems to negate the point of allowing multiple threads to call run in the first place.

除非处理需要相当长的时间,否则不会。

第一段介绍Timer.5 sample 似乎很好地阐述了您的主题。

session

要分离特定于请求的数据(缓冲区和端点),您需要一些 session 概念。 Asio 中一个流行的机制是 bound shared_ptr s 或共享自此 session 类(boost 绑定(bind)支持直接绑定(bind)到 boost::shared_ptr 实例)。

避免并发、不同步地访问 m_socket 的成员您可以添加锁或使用 strand上面链接的 Timer.5 示例中记录的方法。

演示

供您欣赏的是 Daytime.6 异步 UDP 日间服务器,修改为与许多服务 IO 线程一起工作。

请注意,从逻辑上讲,仍然只有一个 IO 线程(strand),因此我们没有违反套接字类的文档化线程安全性。

然而,与官方示例不同,响应可能会乱序排队,这取决于 udp_session::handle_request 中实际处理所花费的时间。 .

注意

  • 一个udp_session用于保存每个请求的缓冲区和远程端点的类
  • 一个线程池,能够在多个内核上扩展实际处理(而非 IO)的负载。
#include <ctime>
#include <iostream>
#include <string>
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

using namespace boost;
using asio::ip::udp;
using system::error_code;

std::string make_daytime_string()
{
    using namespace std; // For time_t, time and ctime;
    time_t now = time(0);
    return ctime(&now);
}

class udp_server; // forward declaration

struct udp_session : enable_shared_from_this<udp_session> {

    udp_session(udp_server* server) : server_(server) {}

    void handle_request(const error_code& error);

    void handle_sent(const error_code& ec, std::size_t) {
        // here response has been sent
        if (ec) {
            std::cout << "Error sending response to " << remote_endpoint_ << ": " << ec.message() << "\n";
        }
    }

    udp::endpoint remote_endpoint_;
    array<char, 100> recv_buffer_;
    std::string message;
    udp_server* server_;
};

class udp_server
{
    typedef shared_ptr<udp_session> shared_session;
  public:
    udp_server(asio::io_service& io_service)
        : socket_(io_service, udp::endpoint(udp::v4(), 1313)), 
          strand_(io_service)
    {
        receive_session();
    }

  private:
    void receive_session()
    {
        // our session to hold the buffer + endpoint
        auto session = make_shared<udp_session>(this);

        socket_.async_receive_from(
                asio::buffer(session->recv_buffer_), 
                session->remote_endpoint_,
                strand_.wrap(
                    bind(&udp_server::handle_receive, this,
                        session, // keep-alive of buffer/endpoint
                        asio::placeholders::error,
                        asio::placeholders::bytes_transferred)));
    }

    void handle_receive(shared_session session, const error_code& ec, std::size_t /*bytes_transferred*/) {
        // now, handle the current session on any available pool thread
        socket_.get_io_service().post(bind(&udp_session::handle_request, session, ec));

        // immediately accept new datagrams
        receive_session();
    }

    void enqueue_response(shared_session const& session) {
        socket_.async_send_to(asio::buffer(session->message), session->remote_endpoint_,
                strand_.wrap(bind(&udp_session::handle_sent, 
                        session, // keep-alive of buffer/endpoint
                        asio::placeholders::error,
                        asio::placeholders::bytes_transferred)));
    }

    udp::socket  socket_;
    asio::strand strand_;

    friend struct udp_session;
};

void udp_session::handle_request(const error_code& error)
{
    if (!error || error == asio::error::message_size)
    {
        message = make_daytime_string(); // let's assume this might be slow

        // let the server coordinate actual IO
        server_->enqueue_response(shared_from_this());
    }
}

int main()
{
    try {
        asio::io_service io_service;
        udp_server server(io_service);

        thread_group group;
        for (unsigned i = 0; i < thread::hardware_concurrency(); ++i)
            group.create_thread(bind(&asio::io_service::run, ref(io_service)));

        group.join_all();
    }
    catch (std::exception& e) {
        std::cerr << e.what() << std::endl;
    }
}

结束语

有趣的是,在大多数情况下,您会看到单线程版本的性能一样好,没有理由使设计复杂化。

或者,您可以使用单线程 io_service专用于 IO,如果这确实是 CPU 密集型部分,则使用老式的工作池来对请求进行后台处理。首先,这简化了设计,其次,这可能会 boost IO 任务的吞吐量,因为不再需要协调发布在链上的任务。

关于c++ - boost::asio async_receive_from UDP 端点在线程之间共享?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26703583/

相关文章:

c++ - 这种壁垒的做法对吗?

c++ - C++ 中的二进制 .dat 文件问题

c++ - 如何为宏符号添加前缀?

python - 使用 os 杀死 python 线程

c++ - boost::lockfree::queue 正在耗尽我的 CPU

c++ - VTK:如何为柏拉图式立体着色

c# - 调用卡住我的 Windows 窗体

c++ - C++中 vector 数组的最佳库

c++ - boost::function 和 std::tr1::function 之间是否有重要区别需要了解

c++ - GDB 中用于 C++ 模板(moSTLy boost)的代码更清晰、更漂亮的堆栈跟踪