c++ - 在 C++11 中等待多个条件变量的最佳方法是什么?

标签 c++ multithreading c++11 synchronization condition-variable

首先介绍一下上下文:我正在学习 C++11 中的线程,为此,我正在尝试构建一个小的 actor类,本质上(我把异常处理和传播的东西排除在外)像这样:

class actor {
    private: std::atomic<bool> stop;
    private: std::condition_variable interrupt;
    private: std::thread actor_thread;
    private: message_queue incoming_msgs;

    public: actor() 
    : stop(false), 
      actor_thread([&]{ run_actor(); })
    {}

    public: virtual ~actor() {
        // if the actor is destroyed, we must ensure the thread dies too
        stop = true;
        // to this end, we have to interrupt the actor thread which is most probably
        // waiting on the incoming_msgs queue:
        interrupt.notify_all();
        actor_thread.join();
    }

    private: virtual void run_actor() {
        try {
            while(!stop)
                // wait for new message and process it
                // but interrupt the waiting process if interrupt is signaled:
                process(incoming_msgs.wait_and_pop(interrupt));
        } 
        catch(interrupted_exception) {
            // ...
        }
    };

    private: virtual void process(const message&) = 0;
    // ...
};

每个 Actor 都有自己的 actor_thread , 在 incoming_msgs 上等待新的传入消息并且——当消息到达时——处理它。

actor_threadactor 一起创建并且必须和它一起死,这就是为什么我需要在 message_queue::wait_and_pop(std::condition_variable interrupt) 中使用某种中断机制.

基本上,我需要 wait_and_pop阻塞直到 a) 一个新的 message到达或 b) 直到 interrupt被解雇,在这种情况下——理想情况下——interrupted_exception将被抛出。

message_queue 中的新消息到达目前也由 std::condition_variable new_msg_notification 建模:

// ...
// in class message_queue:
message wait_and_pop(std::condition_variable& interrupt) {
    std::unique_lock<std::mutex> lock(mutex);

    // How to interrupt the following, when interrupt fires??
    new_msg_notification.wait(lock,[&]{
        return !queue.empty();
    });
    auto msg(std::move(queue.front()));
    queue.pop();
    return msg;
}

长话短说,问题是这样的:如何中断 new_msg_notification.wait(...) 中的新消息等待当interrupt被触发(不引入超时)?

或者,问题可以理解为:我如何等到两个 std::condition_variable 中的任何一个s 有信号吗?

一种天真的方法似乎是不使用 std::condition_variable完全用于中断,而只使用原子标志std::atomic<bool> interrupted然后忙等待new_msg_notification在新消息到达或 true==interrupted 之前有一个非常小的超时.但是,我非常希望避免忙于等待。


编辑:

从 pilcrow 的评论和回答来看,基本上有两种可能的方法。

  1. 根据 Alan、mukunda 和 pilcrow 的建议,将特殊的“终止”消息排入队列。我决定反对这个选项,因为我不知道我希望 Actor 终止时队列的大小。很有可能(当我想要快速终止某些东西时,大多数情况下)队列中有数千条消息要处理,并且等待它们被处理直到最终终止消息得到它似乎是 Not Acceptable 转。
  2. 实现条件变量的自定义版本,通过将通知转发到第一个线程正在等待的条件变量,可能会被另一个线程中断。我选择了这种方法。

对于那些感兴趣的人,我的实现如下。在我的情况下,条件变量实际上是 semaphore (因为我更喜欢它们,也因为我喜欢这样做的练习)。我为这个信号量配备了一个相关的 interrupt可以通过 semaphore::get_interrupt() 从信号量中获得.如果现在有一个线程阻塞 semaphore::wait() , 另一个线程有可能调用 semaphore::interrupt::trigger()在信号量中断时,导致第一个线程解除阻塞并传播 interrupt_exception .

struct
interrupt_exception {};

class
semaphore {
    public: class interrupt;
    private: mutable std::mutex mutex;

    // must be declared after our mutex due to construction order!
    private: interrupt* informed_by;
    private: std::atomic<long> counter;
    private: std::condition_variable cond;

    public: 
    semaphore();

    public: 
    ~semaphore() throw();

    public: void 
    wait();

    public: interrupt&
    get_interrupt() const { return *informed_by; }

    public: void
    post() {
        std::lock_guard<std::mutex> lock(mutex);
        counter++;
        cond.notify_one(); // never throws
    }

    public: unsigned long
    load () const {
        return counter.load();
    }
};

class
semaphore::interrupt {
    private: semaphore *forward_posts_to;
    private: std::atomic<bool> triggered;

    public:
    interrupt(semaphore *forward_posts_to) : triggered(false), forward_posts_to(forward_posts_to) {
        assert(forward_posts_to);
        std::lock_guard<std::mutex> lock(forward_posts_to->mutex);
        forward_posts_to->informed_by = this;
    }

    public: void
    trigger() {
        assert(forward_posts_to);
        std::lock_guard<std::mutex>(forward_posts_to->mutex);

        triggered = true;
        forward_posts_to->cond.notify_one(); // never throws
    }

    public: bool
    is_triggered () const throw() {
        return triggered.load();
    }

    public: void
    reset () throw() {
        return triggered.store(false);
    }
};

semaphore::semaphore()  : counter(0L), informed_by(new interrupt(this)) {}

// must be declared here because otherwise semaphore::interrupt is an incomplete type
semaphore::~semaphore() throw()  {
    delete informed_by;
}

void
semaphore::wait() {
    std::unique_lock<std::mutex> lock(mutex);
    if(0L==counter) {
        cond.wait(lock,[&]{
            if(informed_by->is_triggered())
                throw interrupt_exception();
            return counter>0;
        });
    }
    counter--;
}

使用这个semaphore ,我的消息队列实现现在看起来像这样(使用信号量而不是 std::condition_variable 我可以摆脱 std::mutex :

class
message_queue {    
    private: std::queue<message> queue;
    private: semaphore new_msg_notification;

    public: void
    push(message&& msg) {
        queue.push(std::move(msg));
        new_msg_notification.post();
    }

    public: const message
    wait_and_pop() {
        new_msg_notification.wait();
        auto msg(std::move(queue.front()));
        queue.pop();
        return msg;
    }

    public: semaphore::interrupt&
    get_interrupt() const { return new_msg_notification.get_interrupt(); }
};

我的actor ,现在能够在其线程中以非常低的延迟中断其线程。目前的实现是这样的:

class
actor {
    private: message_queue
    incoming_msgs;

    /// must be declared after incoming_msgs due to construction order!
    private: semaphore::interrupt&
    interrupt;

    private: std::thread
    my_thread;

    private: std::exception_ptr
    exception;

    public:
    actor()
    : interrupt(incoming_msgs.get_interrupt()), my_thread(
        [&]{
            try {
                run_actor();
            }
            catch(...) {
                exception = std::current_exception();
            }
        })
    {}

    private: virtual void
    run_actor() {
        while(!interrupt.is_triggered())
            process(incoming_msgs.wait_and_pop());
    };

    private: virtual void
    process(const message&) = 0;

    public: void
    notify(message&& msg_in) {
        incoming_msgs.push(std::forward<message>(msg_in));
    }

    public: virtual
    ~actor() throw (interrupt_exception) {
        interrupt.trigger();
        my_thread.join();
        if(exception)
            std::rethrow_exception(exception);
    }
};

最佳答案

你问,

What is the best way to wait on multiple condition variables in C++11?

你不能,而且必须重新设计。一个线程一次只能等待一个条件变量(及其关联的互斥体)。在这方面,Windows 的同步工具比“POSIX 风格”系列的同步原语更丰富。

使用线程安全队列的典型方法是将特殊的“全部完成!”加入队列。消息,或设计一个“可破坏”(或“可关闭”)队列。在后一种情况下,队列的内部条件变量会保护一个复杂的谓词:要么一个项目可用要么队列已损坏。

在评论中你观察到

a notify_all() will have no effect if there is no one waiting

这是真的,但可能不相关。 wait() 条件变量也意味着检查谓词,并在实际阻塞通知之前检查它。因此,一个忙于处理“错过”notify_all() 的队列项目的工作线程将在下一次检查队列条件时看到谓词(新项目可用,或者,队列已完成)已更改。

关于c++ - 在 C++11 中等待多个条件变量的最佳方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27341029/

相关文章:

c++ - Poco RefCountedObject 线程安全吗?

multithreading - java :singleton, 静态变量和线程安全

c++ - 使用引用变量修改类私有(private)变量

C++11:为什么私有(private)成员模板可以在类外访问?

c++ - 我应该对字符串使用 unique_ptr 吗?

c++ - 嵌套结构属性继承

c++ - 将哈希函数定义为结构的一部分

c++ - 强制隐式调用非标准删除运算符函数

c++ - 为什么我的成员函数中的 lambda 表达式的主体没有被执行?

c - 如何在线程安全 C 库中高效实现句柄