我有一个生产者线程,它从源文件中读取每个字符及其偏移量,然后将其写入共享循环缓冲区。
我还有一个消费者线程,它读取缓冲区中最旧的元素,然后将其写入复制文件。
生产者和消费者线程的数量作为命令行参数给出,而且每次生产者/消费者线程读取或写入缓冲区,或者读取或写入文件时,都会将特定行写入日志文件。
现在我有一个用于生产者和消费者线程的关键部分。我如何构建它才能最大限度地减少在关键部分花费的时间(它变得很慢)。我正在考虑为我的生产者和消费者线程设置多个关键部分。例如,在我的生产者线程中,我可以有一个关键部分用于从源文件读取并将特定行(例如“从文件读取”)写入日志文件,另一个关键部分用于写入缓冲区并写入特定行(例如“写入缓冲区”)到日志文件。但是,如果关键部分是一个接一个的,这会产生什么影响呢?
这是我的制作人线程:
void *INthread(void *arg)
{
printf("INSIDE INthread\n");
FILE *srcFile = (FILE*)arg;
FILE *lp; // Log file pointers.
int t_id = INid++; // Thread number.
int curOffset;
BufferItem result;
struct timespec t;
t.tv_sec = 0;
t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS+1);
nanosleep(&t, NULL);
fseek(srcFile, 0, SEEK_CUR);
curOffset = ftell(srcFile); // Save the current byte offset.
fseek(srcFile, 0, SEEK_END);
if(len == 0) // If len hasnt been set by the first IN thread yet.
len = ftell(srcFile); // Save the length of the srcFile (number of chars).
fseek(srcFile, curOffset, SEEK_SET); // Revert file pointer to curOffset.
printf("ID %d number of bytes %d\n", t_id, len);
int offs;
int ch;
while(len > 0) // Go through each byte/char in file.
{
/*** CRITICAL SECTION ********************************/
sem_wait(&empty); /* acquire the empty lock */
pthread_mutex_lock( &pt_mutex );
if(len > 0){
fseek(srcFile, 0, SEEK_CUR);
if((offs = ftell(srcFile)) != -1)
result.offset = offs; /* get position of byte in file */
if((ch = fgetc(srcFile)) != EOF)
result.data = ch; /* read byte from file */
// Write to log file "read_byte PTn Ox Bb I-1".
if (!(lp = fopen(log, "a"))) {
printf("could not open log file for writing");
}
if(fprintf(lp, "read_byte PT%d O%d B%d I-1\n", t_id, offs, ch) < 0){
printf("could not write to log file");
}
printf("ID %d --- offset %d char %c len%d\n", t_id, result.offset, result.data, len);
addItem(&cBuff, &result);
// Write to log file "produce PTn Ox Bb Ii ".
if(fprintf(lp, "produce PT%d O%d B%d I%d\n", t_id, offs, ch, cBuff.lastInd) < 0){
printf("could not write to log file");
}
fclose(lp);
len--;
}
pthread_mutex_unlock( &pt_mutex );
sem_post(&full); /* signal full */
/*** END CRITICAL SECTION ********************************/
t.tv_sec = 0;
t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS+1);
nanosleep(&t, NULL);
}
inJoin[t_id] = 1; // This IN thread is ready to be joined.
printf("EXIT INthread\n");
pthread_exit(0);
}
这是我的消费者线程:
void *OUTthread(void *arg)
{
printf("INSIDE OUTthread\n");
struct timespec t;
t.tv_sec = 0;
t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS+1);
nanosleep(&t, NULL);
int processing = 1;
FILE *targetFile, *lp;
BufferItem OUTresult;
int t_id = OUTid++;
int offs, ch;
int numBytes = len;
while(processing){
/*** CRITICAL SECTION ********************************/
sem_wait(&full); /* acquire the full lock */
pthread_mutex_lock( &pt_mutex );
cbRead(&cBuff, &OUTresult);
offs = OUTresult.offset;
ch = OUTresult.data;
if (!(lp = fopen(log, "a"))) {
printf("could not open log file for writing");
}
// Write to log file "consume CTn Ox Bb Ii".
if(fprintf(lp, "consume CT%d O%d B%d I%d\n", t_id, offs, ch, cBuff.lastInd) < 0){
printf("could not write to log file");
}
printf("From buffer: offset %d char %c\n", OUTresult.offset, OUTresult.data);
if (!(targetFile = fopen(arg, "r+"))) {
printf("could not open output file for writing");
}
if (fseek(targetFile, OUTresult.offset, SEEK_SET) == -1) {
fprintf(stderr, "error setting output file position to %u\n",
(unsigned int) OUTresult.offset);
exit(-1);
}
if (fputc(OUTresult.data, targetFile) == EOF) {
fprintf(stderr, "error writing byte %d to output file\n", OUTresult.data);
exit(-1);
}
// Write to log file "write_byte CTn Ox Bb I-1".
if(fprintf(lp, "write_byte CT%d O%d B%d I-1\n", t_id, offs, ch) < 0){
printf("could not write to log file");
}
fclose(lp);
fclose(targetFile);
pthread_mutex_unlock( &pt_mutex );
sem_post(&empty); /* signal empty */
/*** END CRITICAL SECTION ********************************/
t.tv_sec = 0;
t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS+1);
nanosleep(&t, NULL);
}
outJoin[t_id] = 1; // This OUT thread is ready to be joined.
printf("EXIT OUTthread\n");
pthread_exit(0);
}
最佳答案
我会组织这个,以便您的代码采用以下形式:
... processing ...
mutex lock
resource read / write
mutex unlock
... continue processing
围绕要共享的每个资源。因此,您最终将拥有多个互斥体,一个用于读取文件的生产者,一个用于写入循环缓冲区的生产者。一个供消费者从循环缓冲区读取......分机。每个都将封装对其所尊重的资源的单个读/写操作。
我还会这样做,以便您的循环缓冲区不能覆盖自身,否则您将无法同时读取和写入缓冲区,只为读/写操作创建一个互斥体,从而有效地增加消费者和生产者线程等待时间。
这可能不是“最佳”解决方案,但它的处理速度应该比在大部分代码周围设置一个巨大的锁要快。
关于c - 构建多线程的关键部分,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24497293/