c++ - 如何处理来自另一个线程的条件变量丢失信号

标签 c++ c++11

因此,在下面的可编译代码中,我正在发送一条查询消息以由另一个线程处理,并且如果达到某个超时,我想等待响应或超时。我不知道为什么 wait_until 丢失了信号并达到了超时期限,而它不应该这样做。仅当处理程序返回响应非常快时才会发生这种情况。您建议我如何修复下面的代码?

#include <mutex>
#include <memory>
#include <condition_variable>
#include <atomic>
#include <thread>
#include <iostream>
#include <queue>
#include <zconf.h>

class Question
{

};

class Answer
{
public:
    bool isAnswered = false;
};

class Query
{
    std::condition_variable     _cv;
    std::mutex                  _mutex;
    std::atomic_bool            _questionAnswered;
    std::atomic_bool            _questionSet;

    std::shared_ptr<Question>   _question;
    std::shared_ptr<Answer>     _answer;

public:
    void setQuestion(std::shared_ptr<Question> & question)
    {
        if(!_questionSet)
        {
            _question = question;
            _questionSet = true;
        }
    };

    void setAnswer(std::shared_ptr<Answer> answer)
    {
        std::unique_lock<std::mutex> lock(_mutex);
        if(!_questionAnswered)
        {
            // Set the answer and notify the getAnswerWithTimeout() to unlock if holding
            _answer = answer;
            _questionAnswered = true;
            lock.unlock();
            _cv.notify_all();
        }
    };

    std::shared_ptr<Answer> getAnswerWithTimeout(uint64_t micros)
    {
        std::unique_lock<std::mutex> lock(_mutex);

        if(!_questionAnswered)
        {
            auto now = std::chrono::system_clock::now();

            // When timeout occurs, lock down this class, set the answer as null, and set error to timeout
            if (!_cv.wait_until(lock, now + std::chrono::microseconds(micros), [&]() { return (bool)_questionAnswered; }) )
            {
                _answer = nullptr;
                _questionAnswered = true;
            }
        }

        return _answer;
    };
};

void function_to_run(std::shared_ptr<Query> query)
{
    // Respond to query and set the answer
    auto answer = std::make_shared<Answer>();
    answer->isAnswered = true;

    // Set the response answer
    query->setAnswer(answer);
}



std::queue<std::shared_ptr<Query>> queryHandler;
bool keepRunning = true;
std::mutex queryHandlerMutex;
std::condition_variable queryHandlerCv;

void handleQueryHandler()
{
    while (true)
    {
        std::shared_ptr<Query> query;

        {
            std::unique_lock<std::mutex> lock(queryHandlerMutex);
            queryHandlerCv.wait(lock, [&] { return !keepRunning || !queryHandler.empty(); });
            if (!keepRunning) {
                return;
            }
            // Pop off item from queue
            query = queryHandler.front();
            queryHandler.pop();
        }

        // Process query with function
        function_to_run(query);
    }
}

void insertIntoQueryHandler(std::shared_ptr<Query> & query)
{
    {
        std::unique_lock<std::mutex> lock(queryHandlerMutex);
    
        // Insert into Query Handler
        queryHandler.emplace(query);
    }
    // Notify query handler to start if locked on empty
    queryHandlerCv.notify_one();
}

std::shared_ptr<Answer>
ask(std::shared_ptr<Query> query, uint64_t timeoutMicros=0)
{
    std::shared_ptr<Answer> answer = nullptr;

    // Send Query to be handled by external thread
    insertIntoQueryHandler(query);

    // Hold for the answer to be returned with timeout period
    answer = query->getAnswerWithTimeout(timeoutMicros);

    return answer;
}


int main()
{
    // Start Up Query Handler thread to handle Queries
    std::thread queryHandlerThread(handleQueryHandler);

    // Create queries in infinite loop and process
    for(int i = 0; i < 1000000; i++)
    {
        auto question = std::make_shared<Question>();
        auto query = std::make_shared<Query>();
        query->setQuestion(question);

        auto answer = ask(query, 1000);
        if(!answer)
        {
            std::cout << "Query Timed out after 1000us" << std::endl;
        }
    }
    // Stop the thread
    {
        std::unique_lock<std::mutex> lock(queryHandlerMutex);
        keepRunning = false;
    }
    queryHandlerCv.notify_one();
    queryHandlerThread.join();
    return 0;
}

最佳答案

正如评论中所讨论的,这里的主要问题是您在此间隔中使用的超时期限(1ms):

     auto now = std::chrono::system_clock::now();

.... another thread may sneak in here ....

     if (!_cv.wait_until(lock, now + std::chrono::microseconds(micros), [&]() { return (bool)_questionAnswered; }) )
     {

另一个线程可以潜入并消耗一个时间片(例如 10 毫秒),并且 wait_until 将立即超时。此外,还有关于 wait_until 意外行为的报告,如下所述: std::condition_variable wait_until surprising behaviour

将超时增加到几个时间片的顺序将解决此问题。您还可以调整线程优先级。

我个人建议使用 wait_for 来轮询条件变量,这样既高效又及时地退出(而不是轮询标志并休眠)。


非 RTOS 系统中的时间片往往约为 10 毫秒,因此我不希望如此短的超时能够在通用系统中准确且可预测地工作。有关抢占式多任务处理的介绍,请参阅此: https://www.geeksforgeeks.org/time-slicing-in-cpu-scheduling/

还有这个: http://dev.ti.com/tirex/explore/node?node=AL.iEm6ATaD6muScZufjlQ__pTTHBmu__LATEST


正如 jtbandes 指出的那样,值得使用 Clang 线程清理程序等工具来检查潜在的逻辑竞争:https://clang.llvm.org/docs/ThreadSanitizer.html

关于c++ - 如何处理来自另一个线程的条件变量丢失信号,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65115508/

相关文章:

c++ - 为什么内联函数看不到作用域内部?

c++ - 如何解析 CPP union 中的结构?

c++ - 可变参数列表中有哪些类型的默认提升?

c++ - 排序 vector 上 std::lower_bound 的时间复杂度

c++ - 什么是右值、左值、x值、左值和右值?

c++11 - 需要使用 C++11 的确定性均匀分布

c++ - 机器类型(C++ 库): i386 vs x86_64

c++ - 如何使绑定(bind)支持脚本语言

c++ - 该片段在 Coliru 中编译时带有警告,但在 Ideone 中编译正常。哪一个是正确的?

c++ - 与复制相比, move vector 中的一系列元素有什么好处?