c++ - 对于 boost io_service,是否只有一个线程在 epoll_wait 上阻塞?

标签 c++ multithreading c++11 boost-asio

我看了Boost ASIO的源码,想知道它只有一个线程调用epoll_wait(当然,如果我用epoll reactor)。
我想找到关于多个线程调用epoll_wait的解决方案,这可能会导致不同的线程同时对同一个套接字进行读取。 我读了一些关键代码如下:

// Prepare to execute first handler from queue.
      operation* o = op_queue_.front();
      op_queue_.pop();
      bool more_handlers = (!op_queue_.empty());

      if (o == &task_operation_)
      {
        task_interrupted_ = more_handlers;

        if (more_handlers && !one_thread_)
          wakeup_event_.unlock_and_signal_one(lock);
        else
          lock.unlock();

        task_cleanup on_exit = { this, &lock, &this_thread };
        (void)on_exit;

        // Run the task. May throw an exception. Only block if the operation
        // queue is empty and we're not polling, otherwise we want to return
        // as soon as possible.
        task_->run(!more_handlers, this_thread.private_op_queue);
      }

task_ 是 epoll react 器,它会在运行中调用 epoll_wait, 我猜它可能只有一个线程调用它,因为 op_queue_ 中只有一个“task_operation_”,对吗?
如果我想在多线程中使用 epoll,或者我可以使用“EPOLLONESHOT”,这样它可以确保一个线程同时处理一个套接字。

最佳答案

  • 第一种情况,当您使用 io_service 的单个实例并从多个线程调用 io_service::run 方法时。

让我们看看 schduler::run 函数(简化):

std::size_t scheduler::run(asio::error_code& ec)
{
  mutex::scoped_lock lock(mutex_);

  std::size_t n = 0;
  for (; do_run_one(lock, this_thread, ec); lock.lock())
    if (n != (std::numeric_limits<std::size_t>::max)())
      ++n;
  return n;
}

因此,在持有锁的情况下,它会调用 do_run_one 方法,类似于:

std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,
    scheduler::thread_info& this_thread,
    const asio::error_code& ec)
{
  while (!stopped_)
  {
    if (!op_queue_.empty())
    {
      // Prepare to execute first handler from queue.
      operation* o = op_queue_.front();
      op_queue_.pop();
      bool more_handlers = (!op_queue_.empty());

      if (o == &task_operation_)
      {
        task_interrupted_ = more_handlers;

        if (more_handlers && !one_thread_)
          wakeup_event_.unlock_and_signal_one(lock);
        else
          lock.unlock();

        task_cleanup on_exit = { this, &lock, &this_thread };
        (void)on_exit;

        task_->run(!more_handlers, this_thread.private_op_queue);
      }
      else
      {
        //......
      }
    }
    else
    {
      wakeup_event_.clear(lock);
      wakeup_event_.wait(lock);
    }
  }

  return 0;
}

代码中有趣的部分是这些行:

if (more_handlers && !one_thread_)
  wakeup_event_.unlock_and_signal_one(lock);
else
  lock.unlock(); 

我们现在讨论的是多线程的情况,所以满足第一个条件(假设我们在op_queue_中有相当多的pending tasks)。

wakeup_event_.unlock_and_signal_one 最终做的是释放/解锁 lock 并通知正在等待条件等待的线程之一。因此,有了这个,至少另一个线程(无论谁获得锁)现在可以调用 do_run_one

如您所说,task_epoll_reactor。而且,在它的 run 方法中,它调用 epoll_wait(不持有 schedulerlock_)。

这里有趣的是当它遍历 epoll_wait 返回的所有就绪描述符时它做了什么。它将它们推回到它在参数中作为引用接收的操作队列中。推送的操作现在具有 descriptor_state 而不是 task_operation_ 的运行时类型:

for (int i = 0; i < num_events; ++i)
  {
    void* ptr = events[i].data.ptr;
    if (ptr == &interrupter_)
    {
      // don't call work_started() here. This still allows the scheduler to
      // stop if the only remaining operations are descriptor operations.
      descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
      descriptor_data->set_ready_events(events[i].events);
      ops.push(descriptor_data);
    }
  }

因此,在 scheduler::do_run_one 内的 while 循环的下一次迭代中,对于已完成的任务,它将到达 else 分支(我在我的粘贴较早):

     else
      {
        std::size_t task_result = o->task_result_;

        if (more_handlers && !one_thread_)
          wake_one_thread_and_unlock(lock);
        else
          lock.unlock();

        // Ensure the count of outstanding work is decremented on block exit.
        work_cleanup on_exit = { this, &lock, &this_thread };
        (void)on_exit;

        // Complete the operation. May throw an exception. Deletes the object.
        o->complete(this, ec, task_result);

        return 1;
      }

调用 complete 函数指针的函数指针可能会调用用户传递给 async_readasync_write API 的句柄。

  • 第二种情况,您创建一个 io_service 对象池并在一个或多个线程上调用其 run 方法,即 io_service 之间的映射> 和 thread 可以是 1:1 或 1:N,这可能适合您的应用程序。通过这种方式,您可以以循环方式将 io_service 对象分配给 soucket 对象。

现在,回到你的问题:

If I wanna use epoll in multi-threading, or I may use "EPOLLONESHOT" so that it can ensure that one thread handle one socket at one time.

如果我没有理解错的话,你想使用 1 个线程将所有事件处理到套接字吗?我认为这可以通过方法 2 实现,即创建一个 io_service 对象池并将其映射到 1 个线程。通过这种方式,您可以确保特定套接字上的所有事件将仅由一个线程处理,即 io_service:run 所在的线程。

您不必担心在上述情况下设置 EPOLLONESHOT

我不太确定使用第一种方法是否能获得相同的行为,这是多线程和 1 个 io_service

但是,如果你根本不使用线程,即你的 io_service 在单线程上运行,那么你不必担心所有这些,毕竟 asio 的目的是抽象所有这些东西。

关于c++ - 对于 boost io_service,是否只有一个线程在 epoll_wait 上阻塞?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40338609/

相关文章:

c++ - Win32 LB_GETTEXT 返回垃圾

c++ - 如何以链式方式在两个容器上构建迭代器

java - 多线程中的监听器仅在匿名传递时调用,而不是在声明为成员时调用

c++ - 停止在线程中运行的类方法

具有 Fluent 界面的 C++ Builder 模式

c++ - 为什么在c++中使用ios::sync_with_stdio(false)后printf先于cout执行?

java - 为什么我们不应该在借用的线程上安排中断?

c++ - 序列容器-为什么只有几个序列容器才支持一些表达式

c++ - 绑定(bind)数据成员在 VS2012 中不起作用?

c++ - 确保参数是控制台的输出流