c++ - 锁定步进 pthread 互斥锁

标签 c++ c multithreading pthreads

我不知道这是否是一种好的做法,但我正在处理实时输入数据流,并以锁步顺序使用 pthreads,以允许一次一个线程同时执行不同的操作。这是我的每个线程的程序流程:

void * my_thread() {
    pthread_mutex_lock(&read_mutex);

    /*
    read data from a stream such as stdin into global buffer
    */

    pthread_mutex_lock(&operation_mutex);
    pthread_mutex_unlock(&read_mutex);

    /*
     perform some work on the data you read 
    */

   pthread_mutex_lock(&output_mutex);
   pthread_mutex_unlock(&operation_mutex);

   /*
    Write the data to output such as stdout
    */
   pthread_mutex_unlock(&output_mutex);
}

我知道有 pthread 条件锁,但是我的方法是好主意还是坏主意?我在各种大小的流上对此进行了测试,并且我正在尝试考虑极端情况来造成这种僵局,产生竞争条件,或两者兼而有之。我知道互斥体不能保证线程顺序执行,但我需要帮助来考虑打破这个问题的场景。

更新:

我放弃了这一点,但最近有一段时间重新思考了这一点。我使用 C++ 线程和互斥体重写了代码。我正在尝试使用条件变量,但没有这样的运气。这是我解决问题的方法:

void my_thread_v2() {
    //Let only 1 thread read in at a time
    std::unique_lock<std::mutex> stdin_lock(stdin_mutex);
    stdin_cond.wait(stdin_lock);

    /*
    Read from stdin stream
    */

    //Unlock the stdin mutex
    stdin_lock.unlock();
    stdin_cond.notify_one();

    //Lock step
    std::unique_lock<std::mutex> operation_lock(operation_mutex);
    operation_cond.wait(operation_lock);

    /*
     Perform work on the data that you read in
     */

    operation_lock.unlock();
    operation_cond.notify_one();

    std::unique_lock<std::mutex> stdout_lock(stdout_mutex);
    stdout_cond.wait(stdout_lock);

    /*
     Write the data out to stdout
     */

    //Unlock the stdout mutex
    stdout_lock.unlock();
    stdout_cond.notify_one();
}

我知道这段代码的问题是无法发出第一个条件的信号。我绝对不理解条件变量的正确使用。我查看了 cpp 引用文献中的各种示例,但似乎无法摆脱这样的想法:最初的方法可能是做我想做的事情的唯一方法,即锁定线程。有人可以解释一下吗?

更新2:

因此,我实现了一个简单的 Monitor 类,它利用 C++ condition_variableunique_lock:

class ThreadMonitor{
public:
    ThreadMonitor() : is_occupied(false) {}
    void Wait() {
        std::unique_lock<std::mutex> lock(mx);
        while(is_occupied) {
            cond.wait(lock);
        }
        is_occupied = true;
    }

    void Notify() {
        std::unique_lock<std::mutex> lock(mx);
        is_occupied = false;
        cond.notify_one();
    }

private:
    bool is_occupied;
    std::condition_variable cond;
    std::mutex mx;
};

这是我最初的方法,假设我有三个名为 stdin_monoperation_monstdout_monThreadMonitor:

void my_thread_v3() {
    //Let only 1 thread read in at a time
    stdin_mon.Wait();

    /*
    Read from stdin stream
    */

    stdin_mon.Notify();

    operation_mon.Wait();

    /*
     Perform work on the data that you read in
     */

    operation_mon.Notify();

    stdout_mon.Wait();
    /*
     Write the data out to stdout
     */

    //Unlock the stdout
    stdout_mon.notify();
}

问题在于数据仍然被损坏,因此我必须改回锁定步进线程的原始逻辑:

void my_thread_v4() {
    //Let only 1 thread read in at a time
    stdin_mon.Wait();

    /*
    Read from stdin stream
    */

    operation_mon.Wait();
    stdin_mon.Notify();

    /*
     Perform work on the data that you read in
     */

    stdout_mon.Wait();
    operation_mon.Notify();

    /*
     Write the data out to stdout
     */

    //Unlock the stdout
    stdout_mon.notify();
}

我开始怀疑,如果线程顺序很重要,那么这是处理它的唯一方法。我还想知道使用使用 condition_variable 的监视器比仅使用互斥体有什么好处。

最佳答案

您的方法的问题在于,当另一个线程正在读取数据时您仍然可以修改数据:

  1. 线程A获取读,然后操作并再次释放读,并开始写入一些数据,但被中断。
  2. 现在线程 B 运行,获取 read 并可以读取部分修改的、可能不一致的数据!

我假设您希望允许多个线程在不阻塞的情况下读取相同的数据,但是一旦写入,数据就应该受到保护。最后,在输出数据时,我们只是再次读取修改后的数据,因此可以再次并发执行此操作,但需要防止同时写入。

您可以使用读/写互斥体来更好地做到这一点,而不是使用多个互斥体实例:

  1. 任何仅读取数据的函数都会获取读锁。
  2. 任何打算写入的函数从一开始就获取写锁(请注意,先获取读锁,然后再获取写锁,而不释放其间的读锁可能会导致死锁;不过,如果您在中间释放了读锁,那么您的数据处理就需要稳健,以防止数据被另一个线程在中间修改!)。
  3. 将写锁减少为共享而不在其间释放是安全的,因此我们现在可以在输出之前执行此操作。如果在写入数据和输出数据之间不得修改数据,我们甚至需要在不完全释放锁的情况下执行此操作。

最后一点是有问题的,因为 C++ 标准的线程支持库和 pthreads 库都不支持。

对于 C++ boost 提供 solution ;如果您不想或不能(C!)使用 boost,一个简单但可能不是最有效的方法是通过另一个互斥体保护获取写锁:

  1. 获取保护读写互斥体的标准(非读写)互斥体
  2. 获取RW互斥体进行写入
  3. 释放保护互斥锁
  4. 读取数据、写入修改数据
  5. 获取保护互斥体
  6. 释放RW互斥体
  7. 重新获取RW互斥体进行读取;另一个线程是否也获取读取并不重要,我们只需要防止此处的写入锁定
  8. 释放保护互斥锁
  9. 输出
  10. 释放RW互斥体(无需保护)...

非修改函数只需获取读锁,无需任何进一步的保护,与...没有任何冲突

在 C++ 中,您更喜欢使用 thread support library另外,免费获得平台无关的代码,在 C 中,您将使用标准 pthread 互斥体来保护获取写锁,就像您之前所做的那样,并使用 pthread 中的 RW 变体用于读写锁。

关于c++ - 锁定步进 pthread 互斥锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51704318/

相关文章:

在 C 中将字符串切成 2 个字符字符串的一部分,然后转换为十六进制格式(char * 到 unsigned char)

java - 从 Handler Runnable 更新已销毁的 Activity UI

java - BindException 多线程服务器

java - Shiro 的 DefaultPasswordService 线程安全吗?

python 2 - 为什么 'with' 在嵌入式 c 代码中的行为不同?

c++ - 如何根据行总数的条件删除 Armadillo 中 SpMat<unsigned int> 的行?

c++ - 使用来自 X509 证书 c++ 的公钥进行 RSA 公共(public)加密

c++ - 为什么我的数组代码反转不能正常工作

通过 C 程序与 cmd session 进行对话

c - 在稀疏图(矩阵)的表示中查找元素(邻居)