我实现了自己的循环缓冲区。我想用两个线程来测试它。一个线程不断写入缓冲区,而另一个线程不断读取缓冲区。当读取到一定量的数据时,打印出基准。
此循环缓冲区的目的是写入和读取永远不会访问同一内存,因此不会引发竞争。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include <unistd.h>
#define MAX_NUM_ITEM 10000
#define HASH_SIZE 32
long write_count = 0;
long read_count = 0;
struct cBuf {
int first;
int last;
int max_items;
int item_size;
int valid_items;
unsigned char *buffer;
};
void init_cBuf(struct cBuf *buf, int max_items, int item_size) {
buf -> first = 0;
buf -> last = 0;
buf -> max_items = max_items;
buf -> item_size = item_size;
buf -> valid_items = 0;
buf -> buffer = calloc(max_items, item_size);
return;
}
int isEmpty(struct cBuf *buf) {
if (buf -> valid_items == 0) {
return 1;
}
else {
return 0;
}
}
int push(struct cBuf *buf, unsigned char *data) {
if (buf -> valid_items >= buf -> max_items) {
// buffer full
return -1;
}
else {
// push data into the buffer
memcpy(buf -> buffer + (buf -> last) * (buf -> item_size), data, buf -> item_size);
// update cBuf info
buf -> valid_items++;
buf -> last = (buf -> last + 1) % (buf -> max_items);
return 0;
}
}
int pop(struct cBuf *buf, unsigned char *new_buf) {
if (isEmpty(buf)) {
// buffer empty
return -1;
}
else {
// read data
memcpy(new_buf, buf -> buffer + (buf -> first) * (buf -> item_size), buf -> item_size);
// update cBuf info
buf -> first = (buf -> first + 1) % (buf -> max_items);
buf -> valid_items--;
return 0;
}
}
void *write_hash(void *ptr) {
struct cBuf *buf = (struct cBuf *)(ptr);
while (1) {
unsigned char *hash = malloc(HASH_SIZE); // for simplicity I just create some data with 32-byte.
if (push(buf, hash) == 0) {
write_count++;
//printf("put %lu items into the buffer. valid_items: %d\n", write_count, buf -> valid_items);
}
free(hash);
if (write_count == MAX_NUM_ITEM) {
break;
}
}
printf (" thread id = %lu\n", (long unsigned) (pthread_self ()));
printf (" total write = %lu\n\n", write_count);
return NULL;
}
void *read_hash(void *ptr) {
struct cBuf *buf = (struct cBuf *)(ptr);
unsigned char *new_buf = malloc(HASH_SIZE);
while (1) {
if (pop(buf, new_buf) == 0) {
read_count++;
//printf("pop %lu items from the buffer. valid_items: %d\n", read_count, buf -> valid_items);
}
if (read_count == MAX_NUM_ITEM) {
break;
}
}
free(new_buf);
printf (" thread id = %lu\n", (long unsigned) (pthread_self ()));
printf (" total read = %lu\n\n", read_count);
}
int main(int argc, char const *argv[]) {
struct cBuf buf;
init_cBuf(&buf, 200, HASH_SIZE);
pthread_t write_thd, read_thd;
double diff = 0.0, t1 = 0.0, t2 = 0.0;
t1 = clock ();
pthread_create(&read_thd, NULL, read_hash, &buf);
pthread_create(&write_thd, NULL, write_hash, &buf);
pthread_join(write_thd, NULL);
pthread_join(read_thd, NULL);
t2 = clock ();
diff = (double)((t2 - t1) / CLOCKS_PER_SEC);
printf ("----------------\nTotal time: %lf second\n", diff);
printf ("Total write: %lu\n", write_count);
printf ("write per-second: %lf\n\n", write_count / diff);
return 0;
}
如果我将函数“write_hash”中的“printf”和“read_hash”注释掉,程序永远不会终止,这很奇怪。如果我取消注释这两个“printf”,程序将打印出所有push和pop信息直到最后,最终程序成功退出。
请帮我看看是否是因为我的循环缓冲区实现有问题,或者其他地方有问题。
最佳答案
Luke,就像 EOF 所说的那样,您的代码中将会出现竞争条件(可能有几个)。以下是我看到您的代码时的一些想法:
当线程共享内存时,您需要对该内存进行保护,以便一次只有一个线程可以访问它。您可以使用互斥锁来完成此操作(请参阅 pthread_mutex_lock 和 pthread_mutex_unlock)。
此外,我注意到读写线程中的 if 语句会检查推送和弹出的结果,以查看字符是否实际被推送或弹出。虽然这可能有效,但您可能应该使用信号量来同步线程。我的意思是这样的:当你的写入线程写入缓冲区时,你应该有类似 sem_post(&buff_count_sem);
的内容。 。然后,在您的读取线程读取字符之前,您应该有 sem_wait(&buff_count_sem);
这样您就知道缓冲区中至少有一个字符。
同样的原理也适用于读取线程。我建议阅读线程有 sem_post(&space_left_sem)
就在从缓冲区弹出一个字节并且写入线程具有sem_wait(&space_left_sem)
之后在推送到缓冲区之前,确保缓冲区中有空间容纳您尝试写入的字节。
以下是我对代码的建议更改(未经测试):
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include <unistd.h>
#define MAX_NUM_ITEM 10000
#define HASH_SIZE 32
long write_count = 0;
long read_count = 0;
pthread_mutex_t buff_mutex;
sem_t buff_count_sem;
sem_t space_left_sem;
struct cBuf {
int first;
int last;
int max_items;
int item_size;
int valid_items;
unsigned char *buffer;
};
void init_cBuf(struct cBuf *buf, int max_items, int item_size) {
buf -> first = 0;
buf -> last = 0;
buf -> max_items = max_items;
buf -> item_size = item_size;
buf -> valid_items = 0;
buf -> buffer = calloc(max_items, item_size);
return;
}
int isEmpty(struct cBuf *buf) {
if (buf -> valid_items == 0) {
return 1;
}
else {
return 0;
}
}
int push(struct cBuf *buf, unsigned char *data) {
pthread_mutex_lock(&buff_mutex);
if (buf -> valid_items >= buf -> max_items) {
// buffer full
pthread_mutex_unlock(&buff_mutex);
return -1;
}
else {
// push data into the buffer
memcpy(buf -> buffer + (buf -> last) * (buf -> item_size), data, buf -> item_size);
// update cBuf info
buf -> valid_items++;
buf -> last = (buf -> last + 1) % (buf -> max_items);
pthread_mutex_unlock(&buff_mutex);
return 0;
}
}
int pop(struct cBuf *buf, unsigned char *new_buf) {
phthread_mutex_lock(&buff_mutex);
if (isEmpty(buf)) {
// buffer empty
pthread_mutex_unlock(&buff_mutex);
return -1;
}
else {
// read data
memcpy(new_buf, buf -> buffer + (buf -> first) * (buf -> item_size), buf -> item_size);
// update cBuf info
buf -> first = (buf -> first + 1) % (buf -> max_items);
buf -> valid_items--;
pthread_mutex_unlock(&buff_mutex);
return 0;
}
}
void *write_hash(void *ptr) {
struct cBuf *buf = (struct cBuf *)(ptr);
while (1) {
unsigned char *hash = malloc(HASH_SIZE); // for simplicity I just create some data with 32-byte.
sem_wait(&space_left_sem);
push(buf, hash);
write_count++;
sem_post(&buff_count_sem);
free(hash);
if (write_count == MAX_NUM_ITEM) {
break;
}
}
printf (" thread id = %lu\n", (long unsigned) (pthread_self ()));
printf (" total write = %lu\n\n", write_count);
return NULL;
}
void *read_hash(void *ptr) {
struct cBuf *buf = (struct cBuf *)(ptr);
unsigned char *new_buf = malloc(HASH_SIZE);
while (1) {
sem_wait(&buff_count_sem);
pop(buf, new_buf);
read_count++;
sem_post(&space_left_sem);
if (read_count == MAX_NUM_ITEM) {
break;
}
}
free(new_buf);
printf (" thread id = %lu\n", (long unsigned) (pthread_self ()));
printf (" total read = %lu\n\n", read_count);
}
int main(int argc, char const *argv[]) {
struct cBuf buf;
init_cBuf(&buf, 200, HASH_SIZE);
pthread_t write_thd, read_thd;
pthread_mutex_init(&buff_mutex);
sem_init(&buff_count_sem, 0, 0); // the last parameter is the initial value - initially 0 if no data is in the buffer
sem_init(&space_left_sem, 0, buf.max_items);
double diff = 0.0, t1 = 0.0, t2 = 0.0;
t1 = clock ();
pthread_create(&read_thd, NULL, read_hash, &buf);
pthread_create(&write_thd, NULL, write_hash, &buf);
pthread_join(write_thd, NULL);
pthread_join(read_thd, NULL);
t2 = clock ();
diff = (double)((t2 - t1) / CLOCKS_PER_SEC);
printf ("----------------\nTotal time: %lf second\n", diff);
printf ("Total write: %lu\n", write_count);
printf ("write per-second: %lf\n\n", write_count / diff);
return 0;
}
我建议阅读如何正确使用线程。
更新: 1.修正了一个错字。 2. 您还需要在 isEmpty 以及为多个线程可能访问同一缓冲区的循环缓冲区编写的任何其他函数中使用 mutex_lock 和 mutex_unlock。
关于C 测试问题中的循环缓冲区实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38111220/