c - ANSI C 中带有 PTHREADS 的执行程序池

标签 c multithreading visual-c++ pthreads

我正在用 ANSI C (1972) 编写程序,我必须使用固定数量的线程。我基本上是在读取一个大文件,其中包含诸如 .csv 之类的记录,其中包含纬度和经度数据,我必须处理它们。问题是我不能在 100.000.000 行文件上等待 2 周,我需要使用 threadsforking

基本上我是这样阅读.txt文件的

FILE *file2 = fopen ( lat_long_file, "r" );
if (file2 != NULL)
{
    char line2 [128];

    while (fgets(line2, sizeof line2, file2) != NULL)
    {
        //fputs(line2, stdout);

        char *this_record = trimqq(line2);

        // .....
        // ..... STUFF TO DO (here i must send data to thread function like in JAVA)
        // Thread temp_thread = new Thread(new ThreadClass(arguments ....));
        // temp_thread.start(); <- this is how i would do if i was programming in JAVA
        // .....

    }
}

ma​​in_1.c(使用 pthread.h 进行线程处理)

#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>

#define NUM_THREADS     10

static int current_threads = 0;


void *wait(void *t)
{
   int i;
   long tid;

   tid = (long)t;

   // sleep(1);

   system("sleep 3; date;");

   printf("Sleeping in thread\n");
   printf("Thread with id %lu  ...exiting\n",tid);

   pthread_exit(NULL);
}

int main ()
{
   int rc;
   int i;
   pthread_t threads[NUM_THREADS];
   pthread_attr_t attr;
   void *status;

   // Initialize and set thread joinable
   pthread_attr_init(&attr);
   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

   for( i=0; i < NUM_THREADS; i++ )
   {
     // cout << "main() : creating thread, " << i << endl;
      rc = pthread_create(&threads[i], NULL, wait, (void *)(intptr_t)i );

      if (rc)
      {
        // cout << "Error:unable to create thread," << rc << endl;
         exit(-1);
      }
   }

    // free attribute and wait for the other threads
    pthread_attr_destroy(&attr);
    for( i=0; i < NUM_THREADS; i++ )
    {
        rc = pthread_join(threads[i], &status);
        if (rc)
        {
            printf("Error:unable to join %d\n",rc);
            exit(-1);
        }

        printf("Main: completed thread id : %d",i);
        printf(" exiting with status : %p\n",status);
    }

    printf("Main: program exiting.\n");

    pthread_exit(NULL);
}

我得到这个id的输出

Sleeping in thread
Sleeping in thread
Thread with id 5  ...exiting
Sleeping in thread
Thread with id 0  ...exiting
Sleeping in thread
Sleeping in thread
Sleeping in thread
Thread with id 9  ...exiting
Thread with id 1  ...exiting
Sleeping in thread
Sleeping in thread
Thread with id 7  ...exiting
Thread with id 3  ...exiting
Thread with id 2  ...exiting
Thread with id 6  ...exiting
Sleeping in thread
Thread with id 4  ...exiting
Sleeping in thread
Thread with id 8  ...exiting
Main: completed thread id : 0 exiting with status : (nil)
Main: completed thread id : 1 exiting with status : (nil)
Main: completed thread id : 2 exiting with status : (nil)
Main: completed thread id : 3 exiting with status : (nil)
Main: completed thread id : 4 exiting with status : (nil)
Main: completed thread id : 5 exiting with status : (nil)
Main: completed thread id : 6 exiting with status : (nil)
Main: completed thread id : 7 exiting with status : (nil)
Main: completed thread id : 8 exiting with status : (nil)
Main: completed thread id : 9 exiting with status : (nil)
Main: program exiting.

执行时间为3秒

如果我将 system("sleep 3; date;"); 更改为 system("sleep 10; date;");,执行时间将为 10 秒,而我希望在 void *wait(void *t) 函数的每次调用时休眠......

ma​​in_2_fork(我也试过 fork,但没用)

#include  <stdio.h>
#include  <string.h>
#include  <sys/types.h>
#include <stdlib.h>

#define   MAX_COUNT  200
#define   BUF_SIZE   100

int random_number(int min_num, int max_num);

void  main(void)
{
    int numforks = 0;
    int maxf = 5;
    int status;

    char   buf[BUF_SIZE];

    pid_t PID; 

    int job = 0;
    for(job; job <= 10; job++)
    {
        // fork() = make a copy of this program from this line to the bottom
        PID = fork();

        int fork_id = random_number(1000000,9999999);

        if (PID < 0) 
        {
            // if -1 then couldn't fork ....
            fprintf(stderr, "[!] Couldn't fork!\n");
            exit(1);
        }
        if (( PID == 0 ))
        {
            // 0 = has created a child process
            exit(0);
        }
        else            
        {
            // means that PID is 1 2 3 .... 30000 44534 534634 .... whatever
            // increment the fork count
            numforks++;

            sprintf(buf, "FORK[#%d] BEGIN pid=%d num_forks=%d\n",fork_id,PID,numforks);
            write(1, buf, strlen(buf));

            // sleep(random_number(1,2));

            char str[300];
            sprintf(str,"sleep %d; ps ax | wc -l",random_number(1,4));
            puts(str);

            // OUTPUT COMMAND BEGIN
            FILE *command_execute = popen(str, "r");
            char buf[256];
            int increment = 0;
            while (fgets(buf, sizeof(buf), command_execute) != 0)
            {
                printf("LINE[%d]:%s",increment,buf);
                increment++;
                break;
            }
            pclose(command_execute);
            // OUTPUT COMMAND END   

            // block to not do extra forks
            if (numforks > maxf)
            {
                for (numforks; numforks > maxf; numforks--)
                {
                    PID = wait(&status);
                }
            }

            sprintf(buf, "FORK[#%d] END pid=%d num_forks=%d\n",fork_id,PID,numforks);
            write(1, buf, strlen(buf));
        }

        // sleep(1);
    }
}

int random_number(int min_num, int max_num)
{
    int result=0,low_num=0,hi_num=0;
    if(min_num<max_num)
    {
        low_num=min_num;
        hi_num=max_num+1; // this is done to include max_num in output.
    }
    else
    {
        low_num=max_num+1;// this is done to include max_num in output.
        hi_num=min_num;
    }
    srand(time(NULL));
    result = (rand()%(hi_num-low_num))+low_num;
    return result;
}

输出是:

FORK[#7495656] BEGIN pid=29291 num_forks=1
sleep 1; ps ax | wc -l
LINE[0]:312
FORK[#7495656] END pid=29291 num_forks=1
FORK[#9071759] BEGIN pid=29296 num_forks=2
sleep 4; ps ax | wc -l
LINE[0]:319
FORK[#9071759] END pid=29296 num_forks=2
FORK[#2236079] BEGIN pid=29330 num_forks=3
sleep 4; ps ax | wc -l

......

并且执行不是并行的...而是一个接一个地执行,即使我认为 fork() 函数是在 ps ax | 中创建子进程的grep 'fork2.exe' ...

这是我想要的例子: http://www.javacodegeeks.com/2013/01/java-thread-pool-example-using-executors-and-threadpoolexecutor.html

假设 5 是一次最大线程数。

问题

  1. 为什么 void *wait(void *t) 函数没有正常休眠?为什么 pthread 一个一个地执行它们而不是并行执行?
  2. 我应该怎么做才能在 C 中创建具有固定最大线程数的线程池?

非常感谢。

最佳答案

我还不能发表评论,所以我会在这里回复。您的线程示例正好占用一个线程(您的wait() 函数)休眠的时间。这就是说,如果你这样写会更清楚:

void *some_running_task(void *t)
{
   int i;
   long tid = (long)t;

   printf("Sleeping in thread #%lu ...\n", tid);
   system("sleep 3; date;");

   printf("Thread with #%lu ... exiting\n", tid);
   pthread_exit(NULL);
}

正如@fuzxxl 所说,标准线程库中有一个 wait(),因此您不应该使用它。

您的所有线程同时启动,可能只有几十微秒。它们都在同一时刻开始,因此都在 3 秒后结束。将 sleep 指令更改为 10 秒,您的程序将持续 10 秒。

您可能想要的是一个线程池,它在整个工作完成之前一直保持相同数量的线程忙碌:只要有数据要处理,就会触发一个线程直到达到最大池数。但是,同步线程池很容易出现死锁。您还不如让每个线程处理它自己的文件部分……除非您想要的是将一个线程专用于一行。

我在这里看到的并行性的一个问题是顺序。 如果您关心序列顺序,则线程不一定会按照您读取行的相同顺序生成数据。因此,除非您将处理后的数据连同行号一起放入数据库,否则您将丢失序列顺序。

我看到的另一个问题是输出处理过的数据。它需要适当的同步以避免一个线程的输出不会弄乱另一个线程的输出(当然,iif 线程应该打印出它们的数据)。

除了加快全局处理时间之外,我不太清楚您对并行性的期望是什么。如果你想要一堆线程来处理一堆行,你无论如何都会想出一些类似的东西,就像分割你的源数据文件一样简单......当然,如果它可以完成的话。但至少您可以在读取每一行时控制数据的顺序,然后您可以回退到触发长时间运行的单线程进程而不是长时间运行的多线程应用程序。单线程应用程序比多线程应用程序更容易编程。

是否也必须使用 C,您不能使用 Python 或 Cython ?最大的优势是免去了线程同步的麻烦。

无论如何,加速线性数据处理的方法不止一种。例如 UNIX sed可用于将一定数量的行通过管道传输到处理应用程序。尽可能多地运行 sed | <processing app>如你所愿。或者,您可能只是将数据文件的拆分部分通过管道传输到用 C 或 Python 编写的处理应用程序中。

只是提供标题。

关于c - ANSI C 中带有 PTHREADS 的执行程序池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27460765/

相关文章:

windows - 任何我不应该在我的Visual C++程序中添加GUID验证的原因?

c++ - 如何在msvc中添加高亮关键字

java - 多线程不比单线程快(简单循环测试)

c - 具有结构指针数组的嵌套结构

c++ - 原生显卡功能

c++ - float 乘以零是否保证产生零?

swift - struct(不变性)与线程安全有何关系

java - 终止主线程而不确保终止由它生成的线程

c++ - Visual 中#pragma comment(user) 和 gcc 中#pragma Comment 的用途

c - 如何在8051中将单个特定引脚设置为输入或输出?