c++ - 如何在 C++ 中处理线程安全的回调注册和执行?

标签 c++ multithreading design-patterns c++11

例如我有一个 EventGenerator调用 IEventHandler::onEvent 的类对于所有已注册的事件处理程序:

class IEventHandler {
public: virtual void onEvent(...) = 0;
};


class EventGenerator {
private: 
   std::vector<IEventHandler*> _handlers;
   std::mutex _mutex; // [1]
public:
   void AddHandler(IEventHandler* handler) {
      std::lock_guard<std::mutex> lck(_mutex); // [2]
      _handlers.push_back(handler);
   }
   void RemoveHanler(IEventHandler* handler) {
      std::lock_guard<std::mutex> lck(_mutex); // [3]
      // remove from "_handlers"
   }
private:
   void threadMainTask() {

      while(true) {

         // Do some work ...

         // Post event to all registered handlers
         {
            std::lock_guard<std::mutex> lck(_mutex); // [4]
            for(auto& h : _handlers) { h->onEvent(...); )
         }

         // Do some work ...

      }
    }

代码应该以下列方式是线程安全的:

  • 一个 线程正在执行 EventGenerator::threadMainTask
  • 许多 线程可能会访问 EventGenerator::AddHandlerEventGenerator::RemoveHandler API。

为了支持这一点,我进行了以下同步(请参阅代码中的注释):

  • [1]是保护 vector 的互斥体 _handlers来自多线程访问。
  • [2][3]保护同时添加或删除处理程序。
  • [4]防止在主线程发布事件时更改 vector 。

此代码一直有效直到... 如果由于某种原因,在执行 IEventHandler::onEvent(...) 期间代码试图调用 EventManager::RemoveHandlerEventManager::AddHandler .结果是运行时异常。

以线程安全方式处理事件处理程序注册和执行事件处理程序回调的最佳方法是什么?




>> 更新<<

因此,根据输入,我已更新为以下设计:

class IEventHandler {
public: virtual void onEvent(...) = 0;
};

class EventDelegate {
private: 
   IEventHandler* _handler;
   std::atomic<bool> _cancelled;
public:
   EventDelegate(IEventHandler* h) : _handler(h), _cancelled(false) {};
   void Cancel() { _cancelled = true; }
   void Invoke(...) { if (!_cancelled) _handler->onEvent(...); }
}

class EventGenerator {
private: 
   std::vector<std::shared_ptr<EventDelegate>> _handlers;
   std::mutex _mutex;
public:
   void AddHandler(std::shared_ptr<EventDelegate> handler) {
      std::lock_guard<std::mutex> lck(_mutex);
      _handlers.push_back(handler);
   }
   void RemoveHanler(std::shared_ptr<EventDelegate> handler) {
      std::lock_guard<std::mutex> lck(_mutex);
      // remove from "_handlers"
   }
private:
   void threadMainTask() {

      while(true) {

         // Do some work ...

         std::vector<std::shared_ptr<EventDelegate>> handlers_copy;

         {
            std::lock_guard<std::mutex> lck(_mutex);
            handlers_copy = _handlers;
         }

         for(auto& h : handlers_copy) { h->Invoke(...); )

         // Do some work ...

      }
    }

如您所见,还有一个附加类 EventDelegate有两个目的:

  1. 保持事件回调
  2. 启用取消回调

threadMainTask ,我正在使用 std::vector<std::shared_ptr<EventDelegate>> 的本地拷贝并且我在调用回调之前释放锁。此方法解决了 IEventHandler::onEvent(...) 期间的问题EventGenerator::{AddHandler,RemoveHanler}被称为。

对新设计有什么想法吗?

最佳答案

在 shared_ptr 的原子交换上实现的写时复制 vector (假设回调注册发生的频率远低于通知回调的事件):

using callback_t = std::shared_ptr<std::function<void(event_t const&)> >;
using callbacks_t = std::shared_ptr<std::vector<callback_t> >;
callbacks_t callbacks_;
mutex_t mutex_; // a mutex of your choice

void register(callback_t cb)
{
    // the mutex is to serialize concurrent callbacks registrations
    // this is not always necessary, as depending on the application
    // architecture, single writer may be enforced by design
    scoped_lock lock(mutex_);

    auto callbacks = atomic_load(&callbacks_);

    auto new_callbacks = std::make_shared< std::vector<callback_t> >();
    new_callbacks->reserve(callbacks->size() + 1);
    *new_callbacks = callbacks;
    new_callbacks->push_back(std::move(cb));

    atomic_store(&callbacks_, new_callbacks);
}

void invoke(event_t const& evt)
{
    auto callbacks = atomic_load(&callbacks_);

    // many people wrap each callback invocation into a try-catch
    // and de-register on exception
    for(auto& cb: *callbacks) (*cb)(evt); 
}

特别是在取消注册时执行回调时的异步行为主题,这里最好的方法是记住 Separation of Concerns原则。

回调在被执行之前不能结束。这是通过另一个称为“额外级别的间接”的经典技巧来实现的。也就是说,不是注册用户提供的回调,而是将其包装成类似下面的东西,除了更新 vector 之外,回调注销将在回调包装器上调用下面定义的 discharge() 方法,甚至会通知注销方法调用方回调执行是否成功。

template <class CB> struct cb_wrapper
{
    mutable std::atomic<bool> done_;
    CB cb_;
    cb_wrapper(CB&& cb): cb(std::move(cb_)) {}

    bool discharge()
    {
        bool not_done = false;
        return done_.compare_exchange_strong(not_done, true);
    }

    void operator()(event_t const&)
    {
        if (discharge())
        {
            cb();
        }
    }

};

关于c++ - 如何在 C++ 中处理线程安全的回调注册和执行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23997287/

相关文章:

c++ - 如何访问 USB 电视调谐卡等 BDA 设备

c++ - 基于对 vector 中第一个元素的降序排序没有给出期望的结果

java - 从线程内创建的组合框中获取选定的项目

multithreading - 如何处理阻塞调用期间需要释放锁的情况?

C++ 2D vector clear 按预期工作?

c++ - 如何像普通 C 函数一样使用正确的 'this' 指针调用 C++ 类成员函数? (指向类成员函数的指针)

c - pthread 从线程池中的线程退出

c# - 一般设计模式实现建议

linq-to-sql - 与 DDD、存储库模式和相关领域模型作斗争

c# - 命令模式还是策略模式更适合客户端-服务器调用?