例如我有一个 EventGenerator
调用 IEventHandler::onEvent
的类对于所有已注册的事件处理程序:
class IEventHandler {
public: virtual void onEvent(...) = 0;
};
class EventGenerator {
private:
std::vector<IEventHandler*> _handlers;
std::mutex _mutex; // [1]
public:
void AddHandler(IEventHandler* handler) {
std::lock_guard<std::mutex> lck(_mutex); // [2]
_handlers.push_back(handler);
}
void RemoveHanler(IEventHandler* handler) {
std::lock_guard<std::mutex> lck(_mutex); // [3]
// remove from "_handlers"
}
private:
void threadMainTask() {
while(true) {
// Do some work ...
// Post event to all registered handlers
{
std::lock_guard<std::mutex> lck(_mutex); // [4]
for(auto& h : _handlers) { h->onEvent(...); )
}
// Do some work ...
}
}
代码应该以下列方式是线程安全的:
- 一个 线程正在执行
EventGenerator::threadMainTask
- 许多 线程可能会访问
EventGenerator::AddHandler
和EventGenerator::RemoveHandler
API。
为了支持这一点,我进行了以下同步(请参阅代码中的注释):
-
[1]
是保护 vector 的互斥体_handlers
来自多线程访问。 -
[2]
和[3]
保护同时添加或删除处理程序。 -
[4]
防止在主线程发布事件时更改 vector 。
此代码一直有效直到... 如果由于某种原因,在执行 IEventHandler::onEvent(...)
期间代码试图调用 EventManager::RemoveHandler
或 EventManager::AddHandler
.结果是运行时异常。
以线程安全方式处理事件处理程序注册和执行事件处理程序回调的最佳方法是什么?
>> 更新<<
因此,根据输入,我已更新为以下设计:
class IEventHandler {
public: virtual void onEvent(...) = 0;
};
class EventDelegate {
private:
IEventHandler* _handler;
std::atomic<bool> _cancelled;
public:
EventDelegate(IEventHandler* h) : _handler(h), _cancelled(false) {};
void Cancel() { _cancelled = true; }
void Invoke(...) { if (!_cancelled) _handler->onEvent(...); }
}
class EventGenerator {
private:
std::vector<std::shared_ptr<EventDelegate>> _handlers;
std::mutex _mutex;
public:
void AddHandler(std::shared_ptr<EventDelegate> handler) {
std::lock_guard<std::mutex> lck(_mutex);
_handlers.push_back(handler);
}
void RemoveHanler(std::shared_ptr<EventDelegate> handler) {
std::lock_guard<std::mutex> lck(_mutex);
// remove from "_handlers"
}
private:
void threadMainTask() {
while(true) {
// Do some work ...
std::vector<std::shared_ptr<EventDelegate>> handlers_copy;
{
std::lock_guard<std::mutex> lck(_mutex);
handlers_copy = _handlers;
}
for(auto& h : handlers_copy) { h->Invoke(...); )
// Do some work ...
}
}
如您所见,还有一个附加类 EventDelegate
有两个目的:
- 保持事件回调
- 启用取消回调
在threadMainTask
,我正在使用 std::vector<std::shared_ptr<EventDelegate>>
的本地拷贝并且我在调用回调之前释放锁。此方法解决了 IEventHandler::onEvent(...)
期间的问题EventGenerator::{AddHandler,RemoveHanler}
被称为。
对新设计有什么想法吗?
最佳答案
在 shared_ptr 的原子交换上实现的写时复制 vector (假设回调注册发生的频率远低于通知回调的事件):
using callback_t = std::shared_ptr<std::function<void(event_t const&)> >;
using callbacks_t = std::shared_ptr<std::vector<callback_t> >;
callbacks_t callbacks_;
mutex_t mutex_; // a mutex of your choice
void register(callback_t cb)
{
// the mutex is to serialize concurrent callbacks registrations
// this is not always necessary, as depending on the application
// architecture, single writer may be enforced by design
scoped_lock lock(mutex_);
auto callbacks = atomic_load(&callbacks_);
auto new_callbacks = std::make_shared< std::vector<callback_t> >();
new_callbacks->reserve(callbacks->size() + 1);
*new_callbacks = callbacks;
new_callbacks->push_back(std::move(cb));
atomic_store(&callbacks_, new_callbacks);
}
void invoke(event_t const& evt)
{
auto callbacks = atomic_load(&callbacks_);
// many people wrap each callback invocation into a try-catch
// and de-register on exception
for(auto& cb: *callbacks) (*cb)(evt);
}
特别是在取消注册时执行回调时的异步行为主题,这里最好的方法是记住 Separation of Concerns原则。
回调在被执行之前不能结束。这是通过另一个称为“额外级别的间接”的经典技巧来实现的。也就是说,不是注册用户提供的回调,而是将其包装成类似下面的东西,除了更新 vector 之外,回调注销将在回调包装器上调用下面定义的 discharge()
方法,甚至会通知注销方法调用方回调执行是否成功。
template <class CB> struct cb_wrapper
{
mutable std::atomic<bool> done_;
CB cb_;
cb_wrapper(CB&& cb): cb(std::move(cb_)) {}
bool discharge()
{
bool not_done = false;
return done_.compare_exchange_strong(not_done, true);
}
void operator()(event_t const&)
{
if (discharge())
{
cb();
}
}
};
关于c++ - 如何在 C++ 中处理线程安全的回调注册和执行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23997287/