我想实现一个遵循大致如下接口(interface)的生产者/消费者场景:
class Consumer {
private:
vector<char> read(size_t n) {
// If the internal buffer has `n` elements, then dequeue them
// Otherwise wait for more data and try again
}
public:
void run() {
read(10);
read(4839);
// etc
}
void feed(const vector<char> &more) {
// Safely queue the data
// Notify `read` that there is now more data
}
};
在这种情况下,feed
和 run
将在不同的线程上运行,而 read
应该是阻塞读取(如 recv
和 fread
)。显然,我的双端队列需要某种互斥,并且我需要某种通知系统来通知 read
重试。
我听说条件变量是可行的方法,但我所有的多线程经验都来自于 Windows,我很难全神贯注于它们。
感谢您的帮助!
(是的,我知道返回 vector 效率低下。我们不谈这个。)
最佳答案
此代码尚未准备好投入生产。 不会对任何库调用的结果进行错误检查。
我在 LockThread 中封装了互斥锁的锁定/解锁,因此它是异常安全的。但仅此而已。
此外,如果我认真地这样做,我会将互斥量和条件变量包装在对象中,这样它们就不会在 Consumer 的其他方法中被滥用。但是只要您注意到必须在使用条件变量(以任何方式)之前获取锁,那么这个简单的情况就可以保持原样。
出于兴趣,您是否检查过 boost 线程库?
#include <iostream>
#include <vector>
#include <pthread.h>
class LockThread
{
public:
LockThread(pthread_mutex_t& m)
:mutex(m)
{
pthread_mutex_lock(&mutex);
}
~LockThread()
{
pthread_mutex_unlock(&mutex);
}
private:
pthread_mutex_t& mutex;
};
class Consumer
{
pthread_mutex_t lock;
pthread_cond_t cond;
std::vector<char> unreadData;
public:
Consumer()
{
pthread_mutex_init(&lock,NULL);
pthread_cond_init(&cond,NULL);
}
~Consumer()
{
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&lock);
}
private:
std::vector<char> read(size_t n)
{
LockThread locker(lock);
while (unreadData.size() < n)
{
// Must wait until we have n char.
// This is a while loop because feed may not put enough in.
// pthread_cond() releases the lock.
// Thread will not be allowed to continue until
// signal is called and this thread reacquires the lock.
pthread_cond_wait(&cond,&lock);
// Once released from the condition you will have re-aquired the lock.
// Thus feed() must have exited and released the lock first.
}
/*
* Not sure if this is exactly what you wanted.
* But the data is copied out of the thread safe buffer
* into something that can be returned.
*/
std::vector<char> result(n); // init result with size n
std::copy(&unreadData[0],
&unreadData[n],
&result[0]);
unreadData.erase(unreadData.begin(),
unreadData.begin() + n);
return (result);
}
public:
void run()
{
read(10);
read(4839);
// etc
}
void feed(const std::vector<char> &more)
{
LockThread locker(lock);
// Once we acquire the lock we can safely modify the buffer.
std::copy(more.begin(),more.end(),std::back_inserter(unreadData));
// Only signal the thread if you have the lock
// Otherwise race conditions happen.
pthread_cond_signal(&cond);
// destructor releases the lock and thus allows read thread to continue.
}
};
int main()
{
Consumer c;
}
关于c++ - 如何使用 POSIX 线程实现阻塞读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/206857/