c++ - 如何实现信号量?这个实现是正确的还是错误的?

标签 c++ synchronization semaphore

我想实现信号量类。 stackoverflow 上的一位用户注意到我的实现工作不正常。

一开始我是这样做的:

class sem_t {
    int count;
public:
    sem_t(int _count = 0) : count(_count) {};
    void up() {
        this->count++;
    }
    void down() {
        while (this->count == 0)
            std::this_thread::yield();
        this->count--;
    }
};

然后 stackoverflow 上的一位用户注意到这个实现是错误的,因为我从任何同步原语中读取和写入变量 count 并且在某些时候该值可能变得不正确并且在编译器优化的情况下编译器可以假设变量 count 不能被另一个线程修改。所以,我尝试将互斥锁添加到这个结构中,我是这样做的:

class sem_t {
    int count;
    std::mutex mutualExclusion;
public:
    sem_t(int _count = 0) : count(_count) {};
    void up() {
        this->mutualExclusion.lock();
        this->count++;
        this->mutualExclusion.unlock();
    }
    void down() {
                this->mutualExclusion.lock();
        while (this->count == 0)
            std::this_thread::yield();
        this->count--;
        this->mutualExclusion.unlock();
    }
};

但是如果使用这种方法,当我尝试分离线程时,我收到一条错误消息,指出互斥量在忙碌时已被破坏,导致一个线程可以持有互斥量,然后让出,然后线程被分离并发生错误(这是解决方案好吗?)。 然后我尝试修改这段代码并停止了以下构造:

class sem_t {
    int count;
    std::mutex mutualExclusion;
public:
    sem_t(int _count = 0) : count(_count) {};
    void up() {
        this->mutualExclusion.lock();
        this->count++;
        this->mutualExclusion.unlock();
    }
    void down() {
        while (this->count == 0)
            std::this_thread::yield();
        this->mutualExclusion.lock();
        this->count--;
        this->mutualExclusion.unlock();
    }
};

但我认为这个解决方案也有问题,因为它会导致与第一个解决方案相同的问题。

那么,正确的实现是什么? 我想指出,我尝试使用条件变量实现,但我正在尝试在没有条件变量的情况下实现信号量,如果你想用建议一些解决方案条件变量 请描述条件变量的等待方法是如何工作的。

[编辑]

我的完整代码,使用自己实现的信号量:

#include "pch.h"
#include <iostream>
#include <vector>
#include <mutex>
#include <thread>
#include <chrono>

class sem_t {
    int count;
    std::mutex mutualExc;
public:
    sem_t(int _count = 0) : count(_count) {};
    void up() {
        mutualExc.lock();
        this->count++;
        mutualExc.unlock();
    }
    void down() {
        mutualExc.lock();
        while (this->count == 0) {
            mutualExc.unlock();
            std::this_thread::yield();
            mutualExc.lock();
        }
        this->count--;
        mutualExc.unlock();
    }
};

#define N 5
#define THINKING 0
#define HUNGRY 1
#define EATING 2

std::mutex mx;
std::mutex coutMX;
char philosopherState[N] = { THINKING };
sem_t philosopherSemaphores[N] = { 0 };

void testSetState(short i) {
    if (philosopherState[i] == HUNGRY && philosopherState[(i + 1) % N] != EATING && philosopherState[(i + N - 1) % N] != EATING) {
        philosopherState[i] = EATING;
        philosopherSemaphores[i].up();
    }
}

void take_forks(short i) {
    ::mx.lock();
    philosopherState[i] = HUNGRY;
    
    testSetState(i);
    ::mx.unlock();
    philosopherSemaphores[i].down();
}
void put_forks(short i) {
    ::mx.lock();
    philosopherState[i] = THINKING;

    testSetState((i + 1) % N);
    testSetState((i + N - 1) % N);
    ::mx.unlock();
}

void think(short p) {
    for (short i = 0; i < 5; i++) {
        coutMX.lock();
        std::cout << "Philosopher N" << p << " is thinking!" << std::endl;
        coutMX.unlock();
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
}
void eat(short p) {
    for (short i = 0; i < 5; i++) {
        coutMX.lock();
        std::cout << "Philosopher N" << p << " is eating!" << std::endl;
        coutMX.unlock();
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
}

void philosopher(short i) {
    while (1) {
        think(i);
        take_forks(i);
        eat(i);
        put_forks(i);
    }
}

int main()
{
    std::vector<std::thread*> threadsVector;
    for (int i = 0; i < N; i++) {
        threadsVector.push_back(new std::thread([i]() { philosopher(i); }));
    }

    std::this_thread::sleep_for(std::chrono::milliseconds(15000));

    for (int i = 0; i < N; i++) {
        threadsVector[i]->detach();
    }

    return 0;
}

正在发生的错误(仅在 visual studio 中以发布或 Debug模式运行程序时发生)

This error only occurs while running program in visual studio

The console when error occurs

最佳答案

最后的尝试确实不正确,因为可能会出现几个线程同时调用down,都成功通过

    while (this->count == 0)
        std::this_thread::yield();

行,然后它们都会将计数器递减到可能的负值:

    this->mutualExclusion.lock();
    this->count--;
    this->mutualExclusion.unlock();

因此,计数器值的检查和更新必须以原子方式执行。

如果你想保持繁忙的循环,最简单的方法就是在 yield 之前调用 unlock 并在 yield 之后调用 lock,所以比较和递减将在同一个锁下执行:

void down() {
    this->mutualExclusion.lock();
    while (this->count == 0) {
        this->mutualExclusion.unlock();
        std::this_thread::yield();
        this->mutualExclusion.lock();
    }
    this->count--;
    this->mutualExclusion.unlock();
}

此外,您还可以使用 std::unique_lock 守卫,它在构造函数中锁定提供的互斥量并在析构函数中解锁,这样互斥量就不会意外地处于锁定状态:

void down() {
    std::unique_lock<std::mutex> lock(this->mutualExclusion);
    while (this->count == 0) {
        lock.unlock();
        std::this_thread::yield();
        lock.lock();
    }
    this->count--;
}

要处理“muxed destroyed while busy”错误,您需要有一个标志来停止后台线程并等待它们完成 join 而不是分离:

#include <atomic>

...

std::atomic<bool> stopped{ false };

void philosopher(short i) {
    while (!stopped) {
        ...
    }
}

...

int main()
{
    ...

    stopped = true;
    for (int i = 0; i < N; i++) {
        threadsVector[i]->join();
    }
    return 0;
}

或者如果您真的不想释放静态资源,您可以调用 std::quick_exit而不是 detachreturn:

int main()
{
    ...
    std::this_thread::sleep_for(std::chrono::milliseconds(15000));
    std::quick_exit(0);
}

关于c++ - 如何实现信号量?这个实现是正确的还是错误的?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56598561/

相关文章:

C- 信号量 sem_getvalue 没有返回我所期望的?

c - 使用 3 个信号量的多个生产者和消费者

C++找不到包含

c++ - 试图让 C++ 程序循环回到代码中的另一点

java - 当两个锁定的线程(通过变量)切换其中一个变量并尝试访问另一个变量时,会发生什么?

c++ - 为什么在 C++0x 或 Boost.Thread 中没有针对多个互斥锁的作用域锁?

Javascript - callCom 方法阻止 Firefox 中的动画 gif 和计时器

c - 多线程 printf() 与 c 中的信号量

c++ - 简单递归题

c++ - 对于易于编码的平衡二叉搜索树,我应该选择什么?