c++ - 使用 Peterson 的 N 进程算法的信号量实现

标签 c++ multithreading algorithm operating-system semaphore

我需要对我的代码的反馈以执行以下声明,我走的路对吗?

问题陈述:

一个。实现一个具有私有(private) int 和三个公共(public)方法的信号量类:init、wait 和 signal。 wait 和 signal 方法的行为应与信号量的预期一致,并且在其实现中必须使用 Peterson 的 N 进程算法。

编写一个程序,创建 5 个并发更新共享整数值的线程,并使用 a) 部分创建的信号量类对象来确保并发更新的正确性。

这是我的工作程序:

#include <iostream>
#include <pthread.h>

using namespace std;

pthread_mutex_t mid;                   //muted id
int shared=0;                           //global shared variable
class semaphore {
   int counter;
public:
   semaphore(){
   }
   void init(){
       counter=1;                     //initialise counter 1 to get first thread access
   }
   void wait(){
       pthread_mutex_lock(&mid);         //lock the mutex here
       while(1){
           if(counter>0){             //check for counter value
               counter--;             //decrement counter
               break;                   //break the loop
           }

       }
       pthread_mutex_unlock(&mid);       //unlock mutex here
   }
   void signal(){
       pthread_mutex_lock(&mid);       //lock the mutex here
           counter++;                   //increment counter
           pthread_mutex_unlock(&mid);   //unlock mutex here
       }

};
semaphore sm;
void* fun(void* id)
{
   sm.wait();                           //call semaphore wait
   shared++;                           //increment shared variable
   cout<<"Inside thread "<<shared<<endl;
   sm.signal();                       //call signal to semaphore
}


int main() {

   pthread_t id[5];                   //thread ids for 5 threads
   sm.init();
   int i;
   for(i=0;i<5;i++)                   //create 5 threads
   pthread_create(&id[i],NULL,fun,NULL);
   for(i=0;i<5;i++)
   pthread_join(id[i],NULL);           //join 5 threads to complete their task
   cout<<"Outside thread "<<shared<<endl;//final value of shared variable
   return 0;
}

最佳答案

您需要在等待循环中旋转时释放互斥锁。

测试恰好有效,因为线程很可能在有任何上下文切换之前就开始运行它们的函数,因此每个线程甚至在下一个线程开始之前就完成了。所以你对信号量没有争用。如果你这样做了,他们会被一个服务员卡住,并持有互斥锁,阻止任何人访问柜台,从而释放旋转器。

这是一个有效的示例(尽管它可能仍然存在导致其偶尔无法正确启动的初始化竞争)。看起来比较复杂,主要是因为它使用了gcc内置的原子操作。只要您拥有多个内核,就需要这些,因为每个内核都有自己的缓存。将计数器声明为“volatile”仅有助于编译器优化 - 对于有效的 SMP,缓存一致性需要跨处理器缓存失效,这意味着需要使用特殊的处理器指令。您可以尝试用例如替换它们counter++ 和 counter--(“共享”也一样)——观察它在多核 CPU 上如何不起作用。 (有关 gcc 原子操作的更多详细信息,请参阅 https://gcc.gnu.org/onlinedocs/gcc-4.8.2/gcc/_005f_005fatomic-Builtins.html)

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdint.h>


class semaphore {
    pthread_mutex_t lock;
    int32_t counter;
public:
   semaphore() {
       init();

   }
   void init() {
       counter = 1;           //initialise counter 1 to get first access
   }

   void spinwait() {
       while (true) {
           // Spin, waiting until we see a positive counter
           while (__atomic_load_n(&counter, __ATOMIC_SEQ_CST) <= 0)
               ;

           pthread_mutex_lock(&lock);
           if (__atomic_load_n(&counter, __ATOMIC_SEQ_CST) <= 0) {
               // Someone else stole the count from under us or it was
               // a fluke - keep trying
               pthread_mutex_unlock(&lock);
               continue;
           }
           // It's ours
           __atomic_fetch_add(&counter, -1, __ATOMIC_SEQ_CST);
           pthread_mutex_unlock(&lock);
           return;
       }
   }

   void signal() {
       pthread_mutex_lock(&lock);   //lock the mutex here
       __atomic_fetch_add(&counter, 1, __ATOMIC_SEQ_CST);
       pthread_mutex_unlock(&lock); //unlock mutex here
   }

};

enum { 
    NUM_TEST_THREADS = 5,
    NUM_BANGS = 1000
 };

// Making semaphore sm volatile would be complicated, because the
// pthread_mutex library calls don't expect volatile arguments.

int shared = 0;       // Global shared variable
semaphore sm;         // Semaphore protecting shared variable

volatile int num_workers = 0;   // So we can wait until we have N threads


void* fun(void* id)
{
    usleep(100000);                   // 0.1s. Encourage context switch.


    const int worker = (intptr_t)id + 1;

    printf("Worker %d ready\n", worker);

    // Spin, waiting for all workers to be in a runnable state.  These printouts
    // could be out of order.
    ++num_workers;
    while (num_workers < NUM_TEST_THREADS)
        ;

    // Go!

    // Bang on the semaphore.  Odd workers increment, even decrement.
    if (worker & 1) {
        for (int n = 0; n < NUM_BANGS; ++n) {
            sm.spinwait();
            __atomic_fetch_add(&shared, 1, __ATOMIC_SEQ_CST);
            sm.signal();
        }
    } else {
        for (int n = 0; n < NUM_BANGS; ++n) {
            sm.spinwait();
            __atomic_fetch_add(&shared, -1, __ATOMIC_SEQ_CST);
            sm.signal();
        }
    }

    printf("Worker %d done\n", worker);

    return NULL;
}


int main() {

    pthread_t id[NUM_TEST_THREADS]; //thread ids

    // create test worker threads
    for(int i = 0; i < NUM_TEST_THREADS; i++)
        pthread_create(&id[i], NULL, fun, (void*)((intptr_t)(i)));

    // join threads to complete their task
    for(int i = 0; i < NUM_TEST_THREADS; i++)
        pthread_join(id[i], NULL);

    //final value of shared variable.  For an odd number of
    // workers this is the loop count, NUM_BANGS
    printf("Test done.  Final value: %d\n", shared);
    const int expected = (NUM_TEST_THREADS & 1) ? NUM_BANGS : 0;
    if (shared == expected) {
        puts("PASS");
    } else {
        printf("Value expected was: %d\nFAIL\n", expected);
    }

    return 0;
}

关于c++ - 使用 Peterson 的 N 进程算法的信号量实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47586658/

相关文章:

c++ - 适用于大图的数据结构

c++ - 存储反转数据的 vector

java - 将算法从 O(n^3) 优化为 O(n^2)

c++ - Libnodave - 使用 TCP 连接的 daveStart() 错误

java - 尝试在 Java 中使用多线程对 Arraylist 的数字求和时出现 ConcurrentModificationException

c++ - 单处理器上的其他线程是否可以看到内存重新排序?

c - 如何让 MPI 中的所有等级向等级 0 发送一个值,然后阻塞接收所有等级?

arrays - 在常量空间中交错数组

algorithm - 将其从递归更改为迭代

c++ - boost::python 混合嵌入/暴露:如何获取 globals() 并查看我自己的模块?