multithreading - boost asio 处理程序中的长时间运行/阻塞操作

标签 multithreading client-server boost-asio

现在的情况
我使用 boost.asio 实现了一个 TCP 服务器,它当前使用单个 io_service我称之为 run 的对象来自单个线程的方法。
到目前为止,服务器能够立即响应客户端的请求,因为它在内存中拥有所有必要的信息(接收处理程序中不需要长时间运行的操作)。
问题
现在需求发生了变化,我需要从数据库(使用 ODBC)中获取一些信息——这基本上是一个长时间运行的阻塞操作——以便为客户端创建响应。
我看到了几种方法,但我不知道哪种方法最好(而且可能还有更多方法):
第一种方法
我可以在处理程序中保留长时间运行的操作,只需调用 io_service.run()从多个线程。我想我会使用尽可能多的线程,因为我有可用的 CPU 内核?
虽然这种方法很容易实现,但我认为我不会使用这种方法获得最佳性能,因为线程数量有限(由于数据库访问更多是 I/O 绑定(bind)操作,因此大部分时间都处于空闲状态比计算受限的操作)。
第二种方法
this document 的第 6 节中它说:

Use threads for long running tasks

A variant of the single-threaded design, this design still uses a single io_service::run() thread for implementing protocol logic. Long running or blocking tasks are passed to a background thread and, once completed, the result is posted back to the io_service::run() thread.


这听起来很有希望,但我不知道如何实现。任何人都可以为这种方法提供一些代码片段/示例吗?
第三种方法
Boris Schäling 在 section 7.5 of his boost introduction 中解释如何使用自定义服务扩展 boost.asio。
这看起来工作量很大。与其他方法相比,这种方法有什么好处吗?

最佳答案

这些方法不是明确相互排斥的。我经常看到第一个和第二个的组合:

  • 一个或多个线程正在一个中处理网络 I/O io_service .
  • 长时间运行或阻塞的任务发布到不同的 io_service .这个io_service用作线程池,不会干扰处理网络 I/O 的线程。或者,每次需要长时间运行或阻塞的任务时,都可以生成一个分离的线程;但是,线程创建/销毁的开销可能会产生显着影响。

  • 这个answer它提供了一个线程池实现。此外,这是一个基本示例,试图强调两个 io_services 之间的交互。 .

    #include <iostream>
    
    #include <boost/asio.hpp>
    #include <boost/bind.hpp>
    #include <boost/chrono.hpp>
    #include <boost/optional.hpp>
    #include <boost/thread.hpp>
    
    /// @brief Background service will function as a thread-pool where
    ///        long-standing blocking operations may occur without affecting
    ///        the network event loop.
    boost::asio::io_service background_service;
    
    /// @brief The main io_service will handle network operations.
    boost::asio::io_service io_service;
    
    boost::optional<boost::asio::io_service::work> work;
    
    /// @brief ODBC blocking operation.
    ///
    /// @brief data Data to use for query.
    /// @brief handler Handler to invoke upon completion of operation.
    template <typename Handler>
    void query_odbc(unsigned int data,
                    Handler handler)
    {
      std::cout << "in background service, start querying odbc\n";
      std::cout.flush();
      // Mimic busy work.
      boost::this_thread::sleep_for(boost::chrono::seconds(5));
    
      std::cout << "in background service, posting odbc result to main service\n";
      std::cout.flush();
      io_service.post(boost::bind(handler, data * 2));
    }
    
    /// @brief Functions as a continuation for handle_read, that will be
    ///        invoked with results from ODBC.
    void handle_read_odbc(unsigned int result)
    {
      std::stringstream stream;
      stream << "in main service, got " << result << " from odbc.\n";
      std::cout << stream.str();
      std::cout.flush();
    
      // Allow io_service to stop in this example.
      work = boost::none;
    }
    
    /// @brief Mocked up read handler that will post work into a background
    ///        service.
    void handle_read(const boost::system::error_code& error,
                     std::size_t bytes_transferred)
    {
      std::cout << "in main service, need to query odbc" << std::endl;
      typedef void (*handler_type)(unsigned int);
      background_service.post(boost::bind(&query_odbc<handler_type>,
        21,                // data
        &handle_read_odbc) // handler
      );
    
      // Keep io_service event loop running in this example.
      work = boost::in_place(boost::ref(io_service));
    } 
    
    /// @brief Loop to show concurrency.
    void print_loop(unsigned int iteration)
    {
      if (!iteration) return;
    
      std::cout << "  in main service, doing work.\n";
      std::cout.flush();
      boost::this_thread::sleep_for(boost::chrono::seconds(1));
      io_service.post(boost::bind(&print_loop, --iteration));  
    }
    
    int main()
    {
      boost::optional<boost::asio::io_service::work> background_work(
          boost::in_place(boost::ref(background_service)));
    
      // Dedicate 3 threads to performing long-standing blocking operations.
      boost::thread_group background_threads;
      for (std::size_t i = 0; i < 3; ++i)
        background_threads.create_thread(
          boost::bind(&boost::asio::io_service::run, &background_service));
    
      // Post a mocked up 'handle read' handler into the main io_service.
      io_service.post(boost::bind(&handle_read,
        make_error_code(boost::system::errc::success), 0));
    
      // Post a mockup loop into the io_service to show concurrency.
      io_service.post(boost::bind(&print_loop, 5));  
    
      // Run the main io_service.
      io_service.run();
    
      // Cleanup background.
      background_work = boost::none;
      background_threads.join_all();
    }
    

    和输出:

    在主服务中,需要查询 odbc
    在主要服务,做工作。
    在后台服务中,开始查询 odbc
    在主要服务,做工作。
    在主要服务,做工作。
    在主要服务,做工作。
    在主要服务,做工作。
    在后台服务中,将 odbc 结果发布到主服务
    在主要服务中,从 odbc 获得 42。

    注意单线程处理主io_service帖子工作到background_service ,然后继续处理其事件循环,而 background_service block 。一旦background_service得到结果,它将处理程序发布到主 io_service .

    关于multithreading - boost asio 处理程序中的长时间运行/阻塞操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17648725/

    相关文章:

    c - 桌面应用程序需要服务器端调试

    c++ - boost .Asio : Is it a good thing to use a `io_service` per connection/socket?

    java - 是否存在线程安全的 statsd 客户端?

    multithreading - 读取大数据 block 时是否会出现竞争条件?

    c - 我想在下面的代码中包含一个 for 循环,它可以每隔 n 秒连续发送数据 id、时间戳和消息

    java - Netbeans 中具有多个可执行文件管理的软件

    c++ - boost::asio::serial_port 设置 RTS DTS

    python - 设计 REST API 架构和实现

    java - ExecutorCompletionService 挂起