具有读取器只读访问权限的循环缓冲区

标签 c circular-buffer

我想在读者只有只读访问权限的地方构建循环缓冲区时遇到问题。为了实现平滑的翻转,我让编写者将翻转数据结构的 iterator+1 中的一个 id 设置为 0,我与读者核实了这一点。在第一次翻转之前,我的算法似乎工作正常,然后由于某种原因,resder 将从 writer 显然设置的 id 中读取 0。 我有一些可编译的示例代码来演示这里的问题:

#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

#define NUM_ALM 5
#define ERROR   -1
#define OK      0

//even IDs = alarm active
//odd IDs  = alarm clear
enum alarmid {
    BFD_ACT     = 0x02,
    BFD_CLR     = 0x03,
    LOS_ACT     = 0x0C
};
typedef struct alarm_s {
    long timestamp;
    int alarmid;
    int arg1;
    int arg2;
}alarm_t;
int alarm_add(int id, int arg1, int arg2);
int next_alarm_read(alarm_t *res);
void *alarm_reader(void *arg);

static alarm_t *roller;
pthread_cond_t cv;
pthread_mutex_t mutex;
int main (void)
{
    int i =0;
    alarm_t dat;
    pthread_t reader;
    int ret;

    roller = calloc(NUM_ALM,sizeof(alarm_t));
    printf("allocated memory: %lukB\n",(sizeof(alarm_t)*NUM_ALM)/1024);

    for (i = 1; i< NUM_ALM; i++){
        alarm_add(LOS_ACT,i,0);
    }
    ret = pthread_create(&reader,NULL,alarm_reader,NULL);
    if (ret){
        printf("Error - pthread_create() return code: %d\n",ret);
        return ERROR;
    }
    sleep(1);
    alarm_add(BFD_ACT,8,0);
    alarm_add(BFD_ACT,8,0);
    alarm_add(BFD_ACT,8,0);
    alarm_add(BFD_ACT,8,0);
    alarm_add(BFD_CLR,8,0);
    alarm_add(BFD_CLR,8,0);
    alarm_add(BFD_CLR,8,0);
    alarm_add(BFD_CLR,8,0);
    alarm_add(BFD_ACT,8,0);

    pthread_join(reader,NULL);
}

void *alarm_reader(void *arg)
{
    static alarm_t dat={0};
    int err = 0;
    while(err <= 2)
    {
        if (next_alarm_read(&dat)== OK)
            printf("read alarm id %d, arg1 %d,arg2 %d\n",dat.alarmid,dat.arg1,dat.arg2);
        else{
            printf("alarm_reader() next_alarm_read() returned ERROR, wait\n");
            pthread_mutex_lock(&mutex);
            pthread_cond_wait(&cv, &mutex);
            pthread_mutex_unlock(&mutex);

            err++;
        }
    }
    printf("alarm_reader exit!\n");
}
int alarm_add(int id, int arg1, int arg2)
{
    static int i = 0;
    alarm_t dat={0};
    if (i<NUM_ALM){
        dat.timestamp = time(NULL);
        dat.alarmid = id;
        dat.arg1 = arg1;
        dat.arg2 = arg2;

        if (&roller[i]){
            memcpy(&roller[i],&dat,sizeof(alarm_t));
            if (i+1<NUM_ALM)
                roller[i+1].alarmid = 0;
            else
                roller[0].alarmid = 0;
            pthread_cond_signal(&cv);
            printf("added id %d, arg1 %d, arg2 %d @%d\n",roller[i].alarmid,roller[i].arg1,roller[i].arg2,i);
            i++;
        }
    } else {
        i = 0;
    }
    return 0;
}

int next_alarm_read(alarm_t *res)
{
    static int i = 0;
    static long prev_time = 0;
    if (!res)
        return ERROR;

    if (i<NUM_ALM)
    {
        if (roller[i].alarmid!=0){
            printf("next_alarm_read() reading @%d\n",i);
            res->timestamp = roller[i].timestamp;
            res->alarmid = roller[i].alarmid;
            res->arg1 = roller[i].arg1;
            res->arg2 = roller[i].arg2;
            prev_time = roller[i].timestamp;
            i++;
        } else {
            printf("next_alarm_read() @%d is %d,return ERROR\n",i,roller[i].alarmid);

            return ERROR;
        }
    } else {
        i = 0;
    }
    return OK;
}

输出看起来像:

added id 12, arg1 1, arg2 0 @0
added id 12, arg1 2, arg2 0 @1
added id 12, arg1 3, arg2 0 @2
added id 12, arg1 4, arg2 0 @3
next_alarm_read() reading @0
read alarm id 12, arg1 1,arg2 0
next_alarm_read() reading @1
read alarm id 12, arg1 2,arg2 0
next_alarm_read() reading @2
read alarm id 12, arg1 3,arg2 0
next_alarm_read() reading @3
read alarm id 12, arg1 4,arg2 0
next_alarm_read() @4 is 0,return ERROR
alarm_reader() next_alarm_read() returned ERROR, wait
added id 2, arg1 8, arg2 0 @4
added id 2, arg1 8, arg2 0 @0
added id 2, arg1 8, arg2 0 @1
added id 3, arg1 8, arg2 0 @2
added id 3, arg1 8, arg2 0 @3
added id 3, arg1 8, arg2 0 @4
added id 2, arg1 8, arg2 0 @0
next_alarm_read() reading @4
read alarm id 3, arg1 8,arg2 0
read alarm id 3, arg1 8,arg2 0
next_alarm_read() reading @0
read alarm id 2, arg1 8,arg2 0
next_alarm_read() @1 is 0,return ERROR
alarm_reader() next_alarm_read() returned ERROR, wait

next_alarm_read() @1 的底部打印是 0,返回错误 是错误的,id 应该是 2。我想知道为什么这不能按预期工作?

最佳答案

几个问题:

我不确定 if (&roller[i]) 应该做什么/意味着什么。

main 中的 sleep 并不是真正需要的,我怀疑这是为了改善下面的其他问题。

alarm_add 将在翻转点放置一个条目。

此外,它可能会在读者看到条目之前超出阅读器并覆盖条目(即竞争条件)。

读取器和写入器都需要查看彼此的当前队列索引(即它们不应该是函数作用域 static)以防止溢出/竞争

应该有两个条件变量,而不是只有一个:

  1. 写入者检测到队列已满,需要阻塞,直到读取者排空一个条目
  2. 读取器检测到一个空队列,需要阻塞直到写入器添加新条目

这是您的代码的重构版本,应该可以解决这些问题。我添加了一些调试代码。它可能并不完美 [并且可能会偏向于保守主义],但它应该会让你走得更远 [请原谅不必要的样式清理]:

#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

#define NUM_ALM 5
#define ERROR   -1
#define OK      0

double tvzero;

//even IDs = alarm active
//odd IDs  = alarm clear
enum alarmid {
    BFD_ACT = 0x02,
    BFD_CLR = 0x03,
    LOS_ACT = 0x0C
};

typedef struct alarm_s {
    long timestamp;
    int alarmid;
    int arg1;
    int arg2;
} alarm_t;

void alarm_add(int id, int arg1, int arg2);
int next_alarm_read(alarm_t * res);
void *alarm_reader(void *arg);

static alarm_t *roller;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

// reader variables
pthread_cond_t cv_notempty;             // writer signals when queue not empty
volatile int need_notempty;             // reader sets this before waiting
volatile int idxdeq;                    // reader's queue index

// writer variables
pthread_cond_t cv_notfull;              // reader signals when queue not full
volatile int need_notfull;              // writer sets this before waiting
volatile int idxenq;                    // writer's queue index

volatile int stopall;

double
tvgetf(void)
{
    struct timespec ts;
    double sec;

    clock_gettime(CLOCK_REALTIME,&ts);

    sec = ts.tv_nsec;
    sec /= 1e9;
    sec += ts.tv_sec;

    sec -= tvzero;

    return sec;
}

#define DBG(_reason) \
    dbg(_reason)

void
dbg(const char *reason)
{
    double tvnow;

    tvnow = tvgetf();
    printf("[%.9f] %s\n",tvnow,reason);
}

int
main(void)
{
    int i = 0;
    pthread_t reader;
    int ret;

    tvzero = tvgetf();

    roller = calloc(NUM_ALM, sizeof(alarm_t));
    printf("allocated memory: %lukB\n", (sizeof(alarm_t) * NUM_ALM) / 1024);

    // NOTE: queuing more than a full queue here will cause writer to block
    // forever because reader is not yet started
    for (i = 1; i < NUM_ALM; i++) {
        alarm_add(LOS_ACT, i, 0);
    }

    ret = pthread_create(&reader, NULL, alarm_reader, NULL);
    if (ret) {
        printf("Error - pthread_create() return code: %d\n", ret);
        return ERROR;
    }

#if 0
    sleep(1);
#endif

    alarm_add(BFD_ACT, 8, 0);
    alarm_add(BFD_ACT, 8, 0);
    alarm_add(BFD_ACT, 8, 0);
    alarm_add(BFD_ACT, 8, 0);
    alarm_add(BFD_CLR, 8, 0);
    alarm_add(BFD_CLR, 8, 0);
    alarm_add(BFD_CLR, 8, 0);
    alarm_add(BFD_CLR, 8, 0);
    alarm_add(BFD_ACT, 8, 0);

    // tell reader that all items are queued and it should stop when it
    // processes the final item
    pthread_mutex_lock(&mutex);
    stopall = 1;
    if (need_notempty)
        pthread_cond_signal(&cv_notempty);
    pthread_mutex_unlock(&mutex);

    pthread_join(reader, NULL);

    return 0;
}

// RETURNS: queue index to process (-1=empty)
int
queue_notempty(void)
{
    int curidx;

    do {
        curidx = idxdeq;

        // queue is empty
        if (curidx == idxenq) {
            curidx = -1;
            break;
        }

        // advance dequeue index
        idxdeq += 1;
        idxdeq %= NUM_ALM;
    } while (0);

    return curidx;
}

// RETURNS: queue index to use (-1=full)
int
queue_notfull(void)
{
    int nxtidx;
    int curidx;

    do {
        // get current index
        curidx = idxenq;

        // advance to next slot (wrapping if necessary)
        nxtidx = curidx;
        nxtidx += 1;
        nxtidx %= NUM_ALM;

        // queue is full
        if (nxtidx == idxdeq) {
            curidx = -1;
            break;
        }

        // store back adjusted index
        idxenq = nxtidx;
    } while (0);

    return curidx;
}

void *
alarm_reader(void *arg)
{
    alarm_t dat = { 0 };

    while (1) {
        if (next_alarm_read(&dat))
            break;
        printf("read alarm id %d, arg1 %d,arg2 %d\n",
            dat.alarmid, dat.arg1, dat.arg2);
    }

    printf("alarm_reader exit!\n");

    return (void *) 0;
}

void
alarm_add(int id, int arg1, int arg2)
{
    int curidx;
    alarm_t *rol;

    pthread_mutex_lock(&mutex);

    while (1) {
        curidx = queue_notfull();

        // have an open slot -- store item into it
        if (curidx >= 0) {
            rol = &roller[curidx];

            rol->timestamp = time(NULL);
            rol->alarmid = id;
            rol->arg1 = arg1;
            rol->arg2 = arg2;

            printf("added id %d, arg1 %d, arg2 %d @%d\n",
                rol->alarmid, rol->arg1, rol->arg2, curidx);

            // unblock reader if necessary
            if (need_notempty) {
                DBG("writer signal notempty");
                need_notempty = 0;
                pthread_cond_signal(&cv_notempty);
            }

            break;
        }

        // queue is full -- wait for reader to free up some space
        DBG("writer need_notfull");
        need_notfull = 1;
        pthread_cond_wait(&cv_notfull,&mutex);
        DBG("writer wakeup");
    }

    pthread_mutex_unlock(&mutex);
}

// RETURNS: 1=stop, 0=normal
int
next_alarm_read(alarm_t *res)
{
    //static long prev_time = 0;
    int curidx;
    alarm_t *rol;
    int stopflg = 0;

    pthread_mutex_lock(&mutex);

    while (1) {
        curidx = queue_notempty();

        // queue has an entry -- process it
        if (curidx >= 0) {
            rol = &roller[curidx];

            printf("next_alarm_read() reading @%d\n", curidx);
            *res = *rol;
            //prev_time = rol->timestamp;

            // if writer is waiting/blocking, wake it up because we just
            // freed up a queue slot
            if (need_notfull) {
                DBG("reader signal notfull");
                need_notfull = 0;
                pthread_cond_signal(&cv_notfull);
            }

            break;
        }

        // stop when master has enqueued everything
        stopflg = stopall;
        if (stopflg)
            break;

        // queue is empty -- we must wait for writer to add something
        DBG("reader need_notempty");
        need_notempty = 1;
        pthread_cond_wait(&cv_notempty,&mutex);
    }

    pthread_mutex_unlock(&mutex);

    return stopflg;
}

更新:

I don't understand the do while(0); "loops" in the two Q functions, can you elaboratea little, please?

do while(0) 是一种我经常用来替换 if/else 梯形逻辑的技术。它不是我发明的[它在一些风格指南中进行了讨论,特别是“Code Complete”],但我向它展示过的很多人似乎都喜欢它。查看我的回答:About the exclusiveness of the cases of an if block以获得更好的解释。

And I guessx what my initrial post didn't include is: the master should be able to enqueue things on an ongoing basis, there's no stopall and the reader should start reading as soon as something is available.

实际上,我确实意识到了这一点,而且我发布的代码也支持这一点。

您可能希望在对任何消息进行排队之前发出pthread_create,以防止我在代码注释中提到的潜在死锁。

A fix for this would be to remove stopall, the pthread_cond-signal() (from main) is already done inside alarm_add() so this should work fine.

stopall 不是 用于同步上溢/下溢。仅当作者(主线程)希望接收者/线程完成并干净地停止时。它更像是一种向读者发送“EOF”条件的方式。

如果您的应用程序要“永远”运行,您可以删除stopall

或者,一种更简洁的方式来发出“EOF”信号:主线程可以加入一个特殊的“停止”消息(例如,时间戳为 -1 的消息)来告诉接收者不再发送消息 曾经,我们希望终止该计划。


我建议您添加“诊断模式”来验证您的程序:

main 执行 pthread_create 然后执行:

    for (i = 1; i < 10000000; i++) {
        alarm_add(LOS_ACT, i, 0);
    }

读者应该检查传入的 arg1 值。它们应该像上面那样递增。如果他们不这样做,则存在逻辑错误或竞争条件。


这是我的代码的更新版本,带有用于诊断/单元测试模式的 -D 选项。请注意,所有打印都被禁用以允许它以极快的速度运行:

#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

#define NUM_ALM 5
#define ERROR   -1
#define OK      0

int opt_diag;
double tvzero;

//even IDs = alarm active
//odd IDs  = alarm clear
enum alarmid {
    BFD_ACT = 0x02,
    BFD_CLR = 0x03,
    LOS_ACT = 0x0C
};

typedef struct alarm_s {
    long timestamp;
    int alarmid;
    int arg1;
    int arg2;
} alarm_t;

void alarm_add(int id, int arg1, int arg2);
int next_alarm_read(alarm_t * res);
void *alarm_reader(void *arg);

static alarm_t *roller;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

// reader variables
pthread_cond_t cv_notempty;             // writer signals when queue not empty
volatile int need_notempty;             // reader sets this before waiting
volatile int idxdeq;                    // reader's queue index

// writer variables
pthread_cond_t cv_notfull;              // reader signals when queue not full
volatile int need_notfull;              // writer sets this before waiting
volatile int idxenq;                    // writer's queue index

volatile int stopall;

double
tvgetf(void)
{
    struct timespec ts;
    double sec;

    clock_gettime(CLOCK_REALTIME,&ts);

    sec = ts.tv_nsec;
    sec /= 1e9;
    sec += ts.tv_sec;

    sec -= tvzero;

    return sec;
}

#define prtf(_fmt...) \
    do { \
        if (opt_diag) \
            break; \
        printf(_fmt); \
    } while (0)

#define DBG(_reason) \
    dbg(_reason)

void
dbg(const char *reason)
{
    double tvnow;

    if (! opt_diag) {
        tvnow = tvgetf();
        printf("[%.9f] %s\n",tvnow,reason);
    }
}

int
main(int argc,char **argv)
{
    int i = 0;
    char *cp;
    pthread_t reader;
    int ret;

    --argc;
    ++argv;

    for (;  argc > 0;  --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'D':
            cp += 2;
            opt_diag = (*cp != 0) ? atoi(cp) : 10000000;
            break;
        }
    }

    tvzero = tvgetf();

    roller = calloc(NUM_ALM, sizeof(alarm_t));
    printf("allocated memory: %lukB\n", (sizeof(alarm_t) * NUM_ALM) / 1024);

    // NOTE: queuing more than a full queue here will cause writer to block
    // forever because reader is not yet started
    if (! opt_diag) {
        for (i = 1; i < NUM_ALM; i++) {
            alarm_add(LOS_ACT, i, 0);
        }
    }

    ret = pthread_create(&reader, NULL, alarm_reader, NULL);
    if (ret) {
        printf("Error - pthread_create() return code: %d\n", ret);
        return ERROR;
    }

#if 0
    sleep(1);
#endif

    if (opt_diag) {
        for (i = 1; i < opt_diag; i++) {
            alarm_add(LOS_ACT, i, 0);
        }
    }
    else {
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
    }

    // tell reader that all items are queued and it should stop when it
    // processes the final item
    pthread_mutex_lock(&mutex);
    stopall = 1;
    if (need_notempty)
        pthread_cond_signal(&cv_notempty);
    pthread_mutex_unlock(&mutex);

    pthread_join(reader, NULL);

    return 0;
}

// RETURNS: queue index to process (-1=empty)
int
queue_notempty(void)
{
    int curidx;

    do {
        curidx = idxdeq;

        // queue is empty
        if (curidx == idxenq) {
            curidx = -1;
            break;
        }

        // advance dequeue index
        idxdeq += 1;
        idxdeq %= NUM_ALM;
    } while (0);

    return curidx;
}

// RETURNS: queue index to use (-1=full)
int
queue_notfull(void)
{
    int nxtidx;
    int curidx;

    do {
        // get current index
        curidx = idxenq;

        // advance to next slot (wrapping if necessary)
        nxtidx = curidx;
        nxtidx += 1;
        nxtidx %= NUM_ALM;

        // queue is full
        if (nxtidx == idxdeq) {
            curidx = -1;
            break;
        }

        // store back adjusted index
        idxenq = nxtidx;
    } while (0);

    return curidx;
}

void *
alarm_reader(void *arg)
{
    alarm_t dat = { 0 };
    static int expval = 1;

    while (1) {
        if (next_alarm_read(&dat))
            break;

        if (opt_diag) {
            if (dat.arg1 != expval) {
                printf("expected: %d got %d\n",expval,dat.arg1);
                exit(1);
            }
            ++expval;
        }

        prtf("read alarm id %d, arg1 %d,arg2 %d\n",
            dat.alarmid, dat.arg1, dat.arg2);
    }

    printf("alarm_reader exit!\n");

    return (void *) 0;
}

void
alarm_add(int id, int arg1, int arg2)
{
    int curidx;
    alarm_t *rol;

    pthread_mutex_lock(&mutex);

    while (1) {
        curidx = queue_notfull();

        // have an open slot -- store item into it
        if (curidx >= 0) {
            rol = &roller[curidx];

            rol->timestamp = time(NULL);
            rol->alarmid = id;
            rol->arg1 = arg1;
            rol->arg2 = arg2;

            prtf("added id %d, arg1 %d, arg2 %d @%d\n",
                rol->alarmid, rol->arg1, rol->arg2, curidx);

            // unblock reader if necessary
            if (need_notempty) {
                DBG("writer signal notempty");
                need_notempty = 0;
                pthread_cond_signal(&cv_notempty);
            }

            break;
        }

        // queue is full -- wait for reader to free up some space
        DBG("writer need_notfull");
        need_notfull = 1;
        pthread_cond_wait(&cv_notfull,&mutex);
        DBG("writer wakeup");
    }

    pthread_mutex_unlock(&mutex);
}

// RETURNS: 1=stop, 0=normal
int
next_alarm_read(alarm_t *res)
{
    //static long prev_time = 0;
    int curidx;
    alarm_t *rol;
    int stopflg = 0;

    pthread_mutex_lock(&mutex);

    while (1) {
        curidx = queue_notempty();

        // queue has an entry -- process it
        if (curidx >= 0) {
            rol = &roller[curidx];

            prtf("next_alarm_read() reading @%d\n", curidx);
            *res = *rol;
            //prev_time = rol->timestamp;

            // if writer is waiting/blocking, wake it up because we just
            // freed up a queue slot
            if (need_notfull) {
                DBG("reader signal notfull");
                need_notfull = 0;
                pthread_cond_signal(&cv_notfull);
            }

            break;
        }

        // stop when master has enqueued everything
        stopflg = stopall;
        if (stopflg)
            break;

        // queue is empty -- we must wait for writer to add something
        DBG("reader need_notempty");
        need_notempty = 1;
        pthread_cond_wait(&cv_notempty,&mutex);
    }

    pthread_mutex_unlock(&mutex);

    return stopflg;
}

这是添加了诊断选项的原始代码版本:

#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

int opt_diag;

#define NUM_ALM 5
#define ERROR   -1
#define OK      0

//even IDs = alarm active
//odd IDs  = alarm clear
enum alarmid {
    BFD_ACT = 0x02,
    BFD_CLR = 0x03,
    LOS_ACT = 0x0C
};
typedef struct alarm_s {
    long timestamp;
    int alarmid;
    int arg1;
    int arg2;
} alarm_t;
int alarm_add(int id, int arg1, int arg2);
int next_alarm_read(alarm_t * res);
void *alarm_reader(void *arg);

static alarm_t *roller;
pthread_cond_t cv;
pthread_mutex_t mutex;

#define prtf(_fmt...) \
    do { \
        if (opt_diag) \
            break; \
        printf(_fmt); \
    } while (0)

int
main(int argc,char **argv)
{
    int i = 0;
    char *cp;
    pthread_t reader;
    int ret;

    --argc;
    ++argv;

    for (;  argc > 0;  --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'D':
            cp += 2;
            opt_diag = (*cp != 0) ? atoi(cp) : 10000000;
            break;
        }
    }

    roller = calloc(NUM_ALM, sizeof(alarm_t));
    printf("allocated memory: %lukB\n", (sizeof(alarm_t) * NUM_ALM) / 1024);

    if (! opt_diag) {
        for (i = 1; i < NUM_ALM; i++) {
            alarm_add(LOS_ACT, i, 0);
        }
    }

    ret = pthread_create(&reader, NULL, alarm_reader, NULL);
    if (ret) {
        printf("Error - pthread_create() return code: %d\n", ret);
        return ERROR;
    }

    if (opt_diag) {
        for (i = 1; i < opt_diag; i++) {
            alarm_add(LOS_ACT, i, 0);
        }
    }
    else {
        sleep(1);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
    }

    pthread_join(reader, NULL);
}

void *
alarm_reader(void *arg)
{
    static alarm_t dat = { 0 };
    int expval = 1;
    int err = 0;

    while (err <= 2) {
        if (next_alarm_read(&dat) == OK) {
            prtf("read alarm id %d, arg1 %d,arg2 %d\n", dat.alarmid, dat.arg1, dat.arg2);
            if (opt_diag) {
                if (dat.arg1 != expval) {
                    printf("expected: %d got %d\n",expval,dat.arg1);
                    exit(1);
                }
                ++expval;
            }
        }
        else {
            prtf("alarm_reader() next_alarm_read() returned ERROR, wait\n");
            pthread_mutex_lock(&mutex);
            pthread_cond_wait(&cv, &mutex);
            pthread_mutex_unlock(&mutex);

            err++;
        }
    }
    printf("alarm_reader exit!\n");

    return (void *) 0;
}

int
alarm_add(int id, int arg1, int arg2)
{
    static int i = 0;
    alarm_t dat = { 0 };
    if (i < NUM_ALM) {
        dat.timestamp = time(NULL);
        dat.alarmid = id;
        dat.arg1 = arg1;
        dat.arg2 = arg2;

        if (&roller[i]) {
            memcpy(&roller[i], &dat, sizeof(alarm_t));
            if (i + 1 < NUM_ALM)
                roller[i + 1].alarmid = 0;
            else
                roller[0].alarmid = 0;
            pthread_cond_signal(&cv);
            prtf("added id %d, arg1 %d, arg2 %d @%d\n", roller[i].alarmid, roller[i].arg1, roller[i].arg2, i);
            i++;
        }
    }
    else {
        i = 0;
    }
    return 0;
}

int
next_alarm_read(alarm_t * res)
{
    static int i = 0;
    //static long prev_time = 0;

    if (!res)
        return ERROR;

    if (i < NUM_ALM) {
        if (roller[i].alarmid != 0) {
            prtf("next_alarm_read() reading @%d\n", i);
            res->timestamp = roller[i].timestamp;
            res->alarmid = roller[i].alarmid;
            res->arg1 = roller[i].arg1;
            res->arg2 = roller[i].arg2;
            //prev_time = roller[i].timestamp;
            i++;
        }
        else {
            prtf("next_alarm_read() @%d is %d,return ERROR\n", i, roller[i].alarmid);

            return ERROR;
        }
    }
    else {
        i = 0;
    }
    return OK;
}

关于具有读取器只读访问权限的循环缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50883711/

相关文章:

c - 哪个库有幂函数?

c++ - 函数调用中char[]和char*的区别

c++ - 如何简洁地将一个数组的范围分配给另一个数组的范围?

c - 无需复制的 Windows 环形缓冲区

C程序使用指针数组对char数组进行排序

将 void** 转换为 int 的二维数组 - C

java - Apache Commons - CircularFifoBuffer BufferOverflowException

c - c 中的一个消费者多个生产者在缓冲区满后恢复时防止竞争

java - 向后滚动圆形数组

c++ - 如何在C++中使用无锁循环缓冲区实现零拷贝tcp