c - 跨多个文件线程

标签 c multithreading

我的程序正在读取文件并使用线程计算最高质数,当我将打印语句放入 getNum() 函数时,我的数字正在打印出来。但是,无论我输入多少线程,它似乎都会滞后。每个文件中都有 100 万个整数。有没有人看到我的代码明显有问题?基本上代码是给每个线程 1000 个整数,以便在分配新线程之前检查。我仍然是 C 新手,只是在学习线程的技巧。我的代码现在一团糟,因为我一直在不断地切换。

#include <stdio.h>  
#include <stdlib.h> 
#include <time.h>   
#include <string.h>
#include <pthread.h> 
#include <math.h>
#include <semaphore.h>

//Global variable declaration
char *file1 = "primes1.txt";
char *file2 = "primes2.txt";
char *file3 = "primes3.txt";
char *file4 = "primes4.txt";
char *file5 = "primes5.txt";
char *file6 = "primes6.txt";
char *file7 = "primes7.txt";
char *file8 = "primes8.txt";
char *file9 = "primes9.txt";
char *file10 = "primes10.txt";

char **fn; //file name variable

int numberOfThreads;
int *highestPrime = NULL;
int fileArrayNum = 0;
int loop = 0;

int currentFile = 0;


sem_t semAccess;
sem_t semAssign;

int prime(int n)//check for prime number, return 1 for prime 0 for nonprime
{
  int i;
  for(i = 2; i <= sqrt(n); i++)
    if(n % i == 0)
      return(0);

    return(1);
}

int getNum(FILE* file)
{
  int number;
  char* tempS = malloc(20 *sizeof(char));
  fgets(tempS, 20, file);
  tempS[strlen(tempS)-1] = '\0';
  number = atoi(tempS);


  free(tempS);//free memory for later call

  return(number);
}

void* findPrimality(void *threadnum) //main thread function to find primes
{
  int tNum = (int)threadnum;
  int checkNum;
  char *inUseFile = NULL;
  int x=1;

  FILE* file;
  while(currentFile < 10){

    if(inUseFile == NULL){//inUseFIle being used to check if a file is still being read

      sem_wait(&semAccess);//critical section     
      inUseFile = fn[currentFile];
      sem_post(&semAssign);
      file = fopen(inUseFile, "r");


      while(!feof(file)){
    if(x % 1000 == 0 && tNum !=1){ //go for 1000 integers and then wait
      sem_wait(&semAssign);
    }

    checkNum = getNum(file);
    /*
     * 
     * 
     * 
     * I think the issue is here
     * 
     * 
     * 
     */
    if(checkNum > highestPrime[tNum]){
      if(prime(checkNum)){
        highestPrime[tNum] = checkNum;
      }
    }

    x++;
      }
      fclose(file);
      inUseFile = NULL;
    }
    currentFile++;
  }
}

int main(int argc, char* argv[])
{

  if(argc != 2){ //checks for number of arguements being passed
printf("To many ARGS\n");
return(-1);
  }
  else{//Sets thread cound to user input checking for correct number of threads
    numberOfThreads = atoi(argv[1]);
    if(numberOfThreads < 1 || numberOfThreads > 10){
      printf("To many threads entered\n");
      return(-1);
    }

    time_t preTime, postTime; //creating time variables

    int i;

    fn = malloc(10 * sizeof(char*)); //create file array and initialize

    fn[0] = file1;
    fn[1] = file2;
    fn[2] = file3;
    fn[3] = file4;
    fn[4] = file5;
    fn[5] = file6;
    fn[6] = file7;
    fn[7] = file8;
    fn[8] = file9;
    fn[9] = file10;


    sem_init(&semAccess, 0, 1); //initialize semaphores
    sem_init(&semAssign, 0, numberOfThreads);

    highestPrime = malloc(numberOfThreads * sizeof(int)); //create an array to store each threads highest number

    for(loop = 0; loop < numberOfThreads; loop++){//set initial values to 0
      highestPrime[loop] = 0;   
    }

    pthread_t calculationThread[numberOfThreads]; //thread to do the work

    preTime = time(NULL); //start the clock

    for(i = 0; i < numberOfThreads; i++){
      pthread_create(&calculationThread[i], NULL, findPrimality, (void *)i);
    }

    for(i = 0; i < numberOfThreads; i++){
      pthread_join(calculationThread[i], NULL);
    }

    for(i = 0; i < numberOfThreads; i++){
      printf("this is a prime number: %d \n", highestPrime[i]);
    }
    postTime= time(NULL);
    printf("Wall time: %ld seconds\n", (long)(postTime - preTime));
  }
}

是的,我正在尝试找出所有数字中的最高数字。所以我在过去的几个小时里取得了一些进展,正如 spudd 所说的那样重建程序,目前由于我使用结构我遇到了段错误,我试图在结构中保存最大的单个素数,同时给它们正确的指数。这是修改后的代码。所以简而言之,第一个线程正在做的是创建所有线程并为它们提供一个非常大的整数数组的访问点,它们将通过该数组并找到素数,我想在 while 循环周围实现信号量,以便在它们执行时每 2000 行或末尾更新一个全局质数。

#include <stdio.h>  
#include <stdlib.h> 
#include <time.h>   
#include <string.h>
#include <pthread.h> 
#include <math.h>
#include <semaphore.h>

//Global variable declaration
char *file1 = "primes1.txt";
char *file2 = "primes2.txt";
char *file3 = "primes3.txt";
char *file4 = "primes4.txt";
char *file5 = "primes5.txt";
char *file6 = "primes6.txt";
char *file7 = "primes7.txt";
char *file8 = "primes8.txt";
char *file9 = "primes9.txt";
char *file10 = "primes10.txt";



int numberOfThreads;
int entries[10000000];
int entryIndex = 0;
int fileCount = 0;
char** fileName;
int largestPrimeNumber = 0;


//Register functions
int prime(int n);
int getNum(FILE* file);
void* findPrimality(void *threadNum);
void* assign(void *num);

typedef struct package{
  int largestPrime;
  int startingIndex;
  int numberCount;
}pack;



//Beging main code block
int main(int argc, char* argv[])
{

  if(argc != 2){ //checks for number of arguements being passed
printf("To many threads!!\n");
return(-1);
  }
  else{ //Sets thread cound to user input checking for correct number of threads
    numberOfThreads = atoi(argv[1]);
    if(numberOfThreads < 1 || numberOfThreads > 10){
      printf("To many threads entered\n");
      return(-1);
    }

    int threadPointer[numberOfThreads]; //Pointer array to point to entries

    time_t preTime, postTime; //creating time variables

    int i;

    fileName = malloc(10 * sizeof(char*)); //create file array and initialize

    fileName[0] = file1;
    fileName[1] = file2;
    fileName[2] = file3;
    fileName[3] = file4;
    fileName[4] = file5;
    fileName[5] = file6;
    fileName[6] = file7;
    fileName[7] = file8;
    fileName[8] = file9;
    fileName[9] = file10;

    FILE* filereader;
    int currentNum;

    for(i = 0; i < 10; i++){
      filereader = fopen(fileName[i], "r");
      while(!feof(filereader)){
        char* tempString = malloc(20 *sizeof(char));
        fgets(tempString, 20, filereader);
        tempString[strlen(tempString)-1] = '\0';
        entries[entryIndex] = atoi(tempString);
        entryIndex++;
        free(tempString);       
      }
    }

    //sem_init(&semAccess, 0, 1); //initialize semaphores
    //sem_init(&semAssign, 0, numberOfThreads);
    time_t tPre, tPost;



    pthread_t coordinate;

    tPre = time(NULL);
    pthread_create(&coordinate, NULL, assign, (void**)numberOfThreads);
    pthread_join(coordinate, NULL);


    tPost = time(NULL);



  }

}

void* findPrime(void* pack_array)
{
  pack* currentPack=  pack_array;
  int lp = currentPack->largestPrime;
  int si = currentPack->startingIndex;
  int nc = currentPack->numberCount;

  int i;
  int j = 0;


  for(i = si; i < nc; i++){

    while(j < 2000 || i == (nc-1)){

      if(prime(entries[i])){

    if(entries[i] > lp)

      lp = entries[i];
      }

      j++;

    }

  }
   return (void*)currentPack; 
}

void* assign(void* num)
{
  int y = (int)num;
  int i;

  int count = 10000000/y;
  int finalCount = count + (10000000%y);

  int sIndex = 0;



  pack pack_array[(int)num];
  pthread_t workers[numberOfThreads]; //thread to do the workers


  for(i = 0; i < y; i++){
    if(i == (y-1)){
      pack_array[i].largestPrime = 0;
      pack_array[i].startingIndex = sIndex;
      pack_array[i].numberCount = finalCount;
    }

    pack_array[i].largestPrime = 0;
    pack_array[i].startingIndex = sIndex;
    pack_array[i].numberCount = count;


    pthread_create(&workers[i], NULL, findPrime, (void *)&pack_array[i]);
    sIndex += count;
  }
  for(i = 0; i< y; i++)
    pthread_join(workers[i], NULL);
}




//Functions

int prime(int n)//check for prime number, return 1 for prime 0 for nonprime
{
  int i;
  for(i = 2; i <= sqrt(n); i++)
    if(n % i == 0)
      return(0);

    return(1);
}

这是我的最新更新,我的线程运行有问题,唯一的线程是线程 0 正在完成

#include <stdio.h>  
#include <stdlib.h> 
#include <time.h>   
#include <string.h>
#include <pthread.h> 
#include <math.h>
#include <semaphore.h>

//Global variable declaration
char *file1 = "primes1.txt";
char *file2 = "primes2.txt";
char *file3 = "primes3.txt";
char *file4 = "primes4.txt";
char *file5 = "primes5.txt";
char *file6 = "primes6.txt";
char *file7 = "primes7.txt";
char *file8 = "primes8.txt";
char *file9 = "primes9.txt";
char *file10 = "primes10.txt";

sem_t semHold;

int numberOfThreads;
long unsigned int entries[10000000];
unsigned int entryIndex = 0;
int fileCount = 0;
char** fileName;
long unsigned int largestPrimeNumber = 0;


//Register functions
int prime(unsigned int n);
int getNum(FILE* file);
void* findPrimality(void *threadNum);
void* assign(void *num);

typedef struct package{
  long unsigned int largestPrime;
  unsigned int startingIndex;
  unsigned int numberCount;
}pack;

pack pack_array[10];


//Beging main code block
int main(int argc, char* argv[])
{

  if(argc != 2){ //checks for number of arguements being passed
printf("To many threads!!\n");
return(-1);
  }
  else{ //Sets thread cound to user input checking for correct number of threads
    numberOfThreads = atoi(argv[1]);
    if(numberOfThreads < 1 || numberOfThreads > 10){
      printf("To many threads entered\n");
      return(-1);
    }

    int threadPointer[numberOfThreads]; //Pointer array to point to entries



    int i;


    fileName = malloc(10 * sizeof(char*)); //create file array and initialize

    fileName[0] = file1;
    fileName[1] = file2;
    fileName[2] = file3;
    fileName[3] = file4;
    fileName[4] = file5;
    fileName[5] = file6;
    fileName[6] = file7;
    fileName[7] = file8;
    fileName[8] = file9;
    fileName[9] = file10;

    FILE* filereader;
    long unsigned int currentNum;

    sem_init(&semHold, 0, 1);

    for(i = 0; i < 10; i++){
      filereader = fopen(fileName[i], "r");
      while(fscanf(filereader, "%lu" , &currentNum)!= EOF){
    entries[entryIndex] = currentNum;
    // while(entryIndex < 5){
      //char* tempString = malloc(20 *sizeof(long unsigned int));
    //fgets(tempString, 20, filereader);

    //tempString[strlen(tempString)-1] = '\0';

    //currentNum = atoi(tempString);
    //printf("Test %lu\n",currentNum);

    //entries[entryIndex] = atoi(tempString);

    //entryIndex++;

    //free(tempString);       
    //}
    entryIndex++;
    }
  }
  printf("Test %lu\n",entries[9999999]);
  //sem_init(&semAccess, 0, 1); //initialize semaphores
  //sem_init(&semAssign, 0, numberOfThreads);
  time_t tPre, tPost;



  pthread_t coordinate;

  tPre = time(NULL);

  pthread_create(&coordinate, NULL, assign, (void**)numberOfThreads);

  pthread_join(coordinate, NULL);


  tPost = time(NULL);

  printf("Largest prime = %lu , time: %ld\n", largestPrimeNumber,(long)(tPost-tPre));



}

}

void* findPrime(void* pack_array)
{

  pack* currentPack =  pack_array;
  unsigned int lp = currentPack->largestPrime;
  unsigned int si = currentPack->startingIndex;
  unsigned int nc = currentPack->numberCount;

  int i;
  printf("Starting index Count: %d\n", si);
  for(i = si; i < nc; i++){
    if(i%100000==0)
      printf("Here is i: %d\n", i);
    if(entries[i]%2 != 0){
      if(entries[i] > currentPack->largestPrime){      
    if(prime(entries[i])){

      currentPack->largestPrime = entries[i];
      printf("%lu\n", currentPack->largestPrime);

    if(currentPack->largestPrime > largestPrimeNumber)
      sem_wait(&semHold);
      largestPrimeNumber = currentPack->largestPrime;
      sem_post(&semHold);
    }
      }    
    }
  }

}

void* assign(void* num)
{

  int y = (int)num;
  int i;

  int count = 10000000/y;
  int finalCount = count + (10000000%y);

  int sIndex = 0;
  printf("This is count: %d\n", count);
  printf("This is final count: %d\n", finalCount);


  pthread_t workers[y]; //thread to do the workers


  for(i = 0; i < y; i++){
    printf("for thread %d Starting index: %d\n", i, sIndex);
    if(i == (y-1)){
      pack_array[i].largestPrime = 0;
      pack_array[i].startingIndex = sIndex;
      pack_array[i].numberCount = finalCount;
    }

    pack_array[i].largestPrime = 0;
    pack_array[i].startingIndex = sIndex;
    pack_array[i].numberCount = count;


    pthread_create(&workers[i], NULL, findPrime, (void *)&pack_array[i]);
    printf("thread created\n");
    sIndex += count;

  }
  for(i = 0; i < y; i++)
    pthread_join(workers[i], NULL);

}




//Functions

int prime(unsigned int n)//check for prime number, return 1 for prime 0 for nonprime
{
  int i;
  for(i = 2; i <= sqrt(n); i++)
    if(n % i == 0)
      return(0);

    return(1);
}

最佳答案

好的,这是我的解决方案的一部分,它缺少大部分主要内容,并且缺少一些其他简单的东西,如果您选择以此为基础编写代码,您可以执行以下两项操作之一加载所有数据,然后再启动您的工作程序,或者让主线程在工作人员运行时加载它,我在完整版本中做了后者。但是,您必须做一些工作才能正确处理它,因为目前工作人员永远不会退出。

您可能还想尝试根据此调整上面的单个数组代码。

因此,如果您在启动 worker 之前加载所有数据,则不需要条件变量,它们可以在 next_chunkNULL 时退出。我建议你弄清楚如何在工作人员运行时加载,因为这样会更有效率。 提示:pthread_cond_broadcast()

还缺少实际的 worker 函数。

// A singly linked list of chunks of 1000 numbers
// we use it as a queue of data to be processed
struct number_chunk
{
    struct number_chunk *next;
    int size;
    int nums[1000];
};

pthread_mutex_t cnklst_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t data_available = PTHREAD_COND_INITIALIZER;
struct number_chunk *next_chunk = NULL;

void load_chunks(char *filename)
{
    FILE *in = fopen(filename, "r");
    int done = 0;
    int i;

    if(in == NULL) {
        fprintf(stderr, "Failed to open file %s\n", filename);
        return;
    }

    // read in all the chunks of 1000 numbers from the file
    while(!done) {
        struct number_chunk *cnk = malloc(sizeof(struct number_chunk)); // allocate a new chunk
        cnk->next = NULL;
        for(i=0; i < 1000; i++) { // do the actual reading
            char tmp[20];
            if(fgets(tmp, 20, in) == NULL) { // end of file, leave the read loop
                done = 1;
                break; 
            }
            cnk->nums[i] = atoi(tmp);
        }

        // need to do this so that the last chunk in a file can have less than 1000 numbers in it
        cnk->size = i;

        // add it to the list of chunks to be processed
        pthread_mutex_lock(&cnklst_mutex);
        cnk->next = next_chunk;
        next_chunk = cnk;
        pthread_cond_signal(&data_available); // wake a waiting worker
        pthread_mutex_unlock(&cnklst_mutex);
    }

    fclose(in);
}

struct number_chunk *get_chunk()
{
    struct number_chunk *cnk = NULL;
    pthread_mutex_lock(&cnklst_mutex);
    //FIXME: if we finish we will never exit the thread
    // need to return NULL when all the work that there will ever be
    // is done, altertitively load everything before starting the workers and 
    // get rid of all the condition variable stuff
    while(next_chunk == NULL)
        pthread_cond_wait(&data_available, &cnklst_mutex);
    cnk = next_chunk;
    if(next_chunk != NULL) next_chunk = next_chunk->next;
    pthread_mutex_unlock(&cnklst_mutex);
    return cnk;
}

我的工作人员报告最终最大素数的方式是在最后通过查看单个全局变量并根据他们在运行期间发现的最高素数设置或不设置它来完成。显然,您需要为此进行同步。

另请注意,由于使用了 pthread_cond_wait(),它使用了互斥锁而不是信号量。/p>

此外,由于这是家庭作业,请阅读我的代码尝试理解它,然后不要再看一遍尝试编写您自己的代码。

我本可以对其进行更多更改,但我不确定如何更改,因为它基本上已经是一个真正通用的生产者/消费者示例,但缺少一些内容:P

如果您决定采用与我相同的策略并在工作人员工作时在主线程中运行加载,则可以尝试另一件事,您可以添加第二个条件变量和一个计数器来限制队列中的 block 数并让您的工作人员在工作用完时唤醒主线程。

关于c - 跨多个文件线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13806803/

相关文章:

c++ - 一段时间后如何停用输入语句?

c - C 中的内存错误

mysql - 夹板和 MySQL : Null storage passed as non-null param

c++ - Qt库——静态成员函数的线程安全

python - Gunicorn 线程没有任何区别

c - printf 64 位类型说明符问题

c - 反斜杠换行符在#if 和#elif 上有效吗?

python - 一会儿监听输入,并发启动一个线程

c++ - 实现线程锁时真的需要原子性吗?

c - Del. and r/w linked List Nodes in Multithreading with RW-locks -> 死锁?