c++ - 如何使用 POSIX 线程实现阻塞读取

标签 c++ multithreading pthreads producer-consumer

我想实现一个遵循大致如下接口(interface)的生产者/消费者场景:

class Consumer {
private:
    vector<char> read(size_t n) {
        // If the internal buffer has `n` elements, then dequeue them
        // Otherwise wait for more data and try again
    }
public:
    void run() {
        read(10);
        read(4839);
        // etc
    }
    void feed(const vector<char> &more) {
        // Safely queue the data
        // Notify `read` that there is now more data
    }
};

在这种情况下,feedrun 将在不同的线程上运行,而 read 应该是阻塞读取(如 recvfread)。显然,我的双端队列需要某种互斥,并且我需要某种通知系统来通知 read 重试。

我听说条件变量是可行的方法,但我所有的多线程经验都来自于 Windows,我很难全神贯注于它们。

感谢您的帮助!

(是的,我知道返回 vector 效率低下。我们不谈这个。)

最佳答案

此代码尚未准备好投入生产。 不会对任何库调用的结果进行错误检查。

我在 LockThread 中封装了互斥锁的锁定/解锁,因此它是异常安全的。但仅此而已。

此外,如果我认真地这样做,我会将互斥量和条件变量包装在对象中,这样它们就不会在 Consumer 的其他方法中被滥用。但是只要您注意到必须在使用条件变量(以任何方式)之前获取锁,那么这个简单的情况就可以保持原样。

出于兴趣,您是否检查过 boost 线程库?

#include <iostream>
#include <vector>
#include <pthread.h>

class LockThread
{
    public:
    LockThread(pthread_mutex_t& m)
        :mutex(m)
    {
        pthread_mutex_lock(&mutex);
    }
    ~LockThread()
    {
        pthread_mutex_unlock(&mutex);
    }
    private:
        pthread_mutex_t& mutex;
};
class Consumer
{
    pthread_mutex_t     lock;
    pthread_cond_t      cond;
    std::vector<char>   unreadData;
    public:
    Consumer()
    {
        pthread_mutex_init(&lock,NULL);
        pthread_cond_init(&cond,NULL);
    }
    ~Consumer()
    {
        pthread_cond_destroy(&cond);
        pthread_mutex_destroy(&lock);
    }

    private:
        std::vector<char> read(size_t n)
        {
            LockThread  locker(lock);
            while (unreadData.size() < n)
            {
                // Must wait until we have n char.
                // This is a while loop because feed may not put enough in.

                // pthread_cond() releases the lock.
                // Thread will not be allowed to continue until
                // signal is called and this thread reacquires the lock.

                pthread_cond_wait(&cond,&lock);

                // Once released from the condition you will have re-aquired the lock.
                // Thus feed() must have exited and released the lock first.
            }

            /*
             * Not sure if this is exactly what you wanted.
             * But the data is copied out of the thread safe buffer
             * into something that can be returned.
             */
            std::vector<char>   result(n); // init result with size n
            std::copy(&unreadData[0],
                      &unreadData[n],
                      &result[0]);

            unreadData.erase(unreadData.begin(),
                             unreadData.begin() + n);
            return (result);
        }
public:
    void run()
    {
        read(10);
        read(4839);
        // etc
    }
    void feed(const std::vector<char> &more)
    {
        LockThread  locker(lock);

        // Once we acquire the lock we can safely modify the buffer.
        std::copy(more.begin(),more.end(),std::back_inserter(unreadData));

        // Only signal the thread if you have the lock
        // Otherwise race conditions happen.
        pthread_cond_signal(&cond);

        // destructor releases the lock and thus allows read thread to continue.
    }
};


int main()
{
    Consumer    c;
}

关于c++ - 如何使用 POSIX 线程实现阻塞读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/206857/

相关文章:

c - 线程不并行运行

嵌入式系统中的多线程

c++ - 哪个模式在矩阵中出现最多-R(更新)

java - 高负载下 Groovy Shell 解释器中的锁争用

c++ - 我可以使用用户定义的转换来将原始指针访问替换为通过类访问吗?

c++ - 非阻塞线程安全堆栈

java - 同步块(synchronized block)实现上的差异

c++ - 共享库和 libpthread.so 的 g++ 问题

c++ - 支持按位运算的精确 X 位大小的变量 (c++)

c++ - 避免繁琐的可选参数