我是线程新手,我想弄清楚为什么在带有队列的线程池的实现中存在互斥体(dstrymutex) 它是在 c 文件中定义的,而不是像所有其他互斥锁一样作为 struct threadpool 的一部分。是否有一个原因? 当我们这样做时,我很想知道声明互斥体的正确位置,这些互斥体的使用方式与此处的使用方式相同。 非常感谢!
代码如下: h 文件:
#ifndef __THREAD_POOL__
#define __THREAD_POOL__
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "osqueue.h"
#define FAILURE -1
#define SUCCESS 0
#define DONT_WAIT_FOR_TASKS 0
typedef struct thread_pool
{
//The field x is here because a struct without fields
//doesn't compile. Remove it once you add fields of your own
int numOfThreads;
pthread_t* threads;
struct os_queue* tasksQueue;
pthread_mutex_t lock;
pthread_mutex_t queueLock;
pthread_cond_t notify;
int stopped;
int canInsert;
}ThreadPool;
/**
* creates a thread pool struct.
* @param numOfThreads number of threads in the thread pool.
* @return reference to new thread pool struct if succeeded, NULL if failed.
*/
ThreadPool* tpCreate(int numOfThreads);
/**
* Destroys the thread pool.
* @param threadPool thread pool
* @param shouldWaitForTasks 0 - dont wait for tasks in the queue, else - wait for tasks.
*/
void tpDestroy(ThreadPool* threadPool, int shouldWaitForTasks);
/**
* inserts a task to the tasks queue of the thread pool.
* @param threadPool thread pool
* @param computeFunc task
* @param param argument to the task
* @return 0- success , -1 - fail
*/
int tpInsertTask(ThreadPool* threadPool, void (*computeFunc) (void *), void* param);
#endif
c文件:
#include <fcntl.h>
#include "threadPool.h"
#define STDERR_FD 2
#define SYS_CALL_FAILURE 10
pthread_mutex_t destryLock;
typedef struct task
{
void (*computeFunc)(void *param);
void* param;
}Task;
/**
* prints error in sys call to stderr.
*/
void printErrorInSysCallToSTDERR() {
char error_msg[] = "Error in system call\n";
write(STDERR_FD, error_msg, sizeof(error_msg));
}
/**
* threads function. tasks are taken and executed by the threads in the thread pool from the tasks queue.
* @param args expected ThreadPool*
* @return void
*/
void* execute(void* args) {
ThreadPool* tp = (ThreadPool*)args;
struct os_queue* taskQueue = tp->tasksQueue;
printf("New thread was created\n");
while (!tp->stopped && !(tp->canInsert == 0 && osIsQueueEmpty(taskQueue))) {
/* Lock must be taken to wait on conditional variable */
pthread_mutex_lock(&(tp->queueLock));
/* Wait on condition variable, check for spurious wakeups.
When returning from pthread_cond_wait(), we own the lock. */
if((osIsQueueEmpty(taskQueue)) && (!tp->stopped)) {
printf("Busy\n");
pthread_cond_wait(&(tp->notify), &(tp->queueLock));
}
pthread_mutex_unlock(&(tp->queueLock));
pthread_mutex_lock(&(tp->lock));
if (!(osIsQueueEmpty(taskQueue))) {
// take task from the queue
Task* task = osDequeue(taskQueue);
pthread_mutex_unlock(&(tp->lock));
// execute task
task->computeFunc(task->param);
free(task);
}
else {
pthread_mutex_unlock(&(tp->lock));
}
}
}
/**
* creates a thread pool struct.
* @param numOfThreads number of threads in the thread pool.
* @return reference to new thread pool struct if succeeded, NULL if failed.
*/
ThreadPool* tpCreate(int numOfThreads) {
int out = open("output", O_CREAT | O_TRUNC | O_WRONLY, 0644);
if (out == -1) {
printf("Failed to open output file\n");
printErrorInSysCallToSTDERR();
exit(SYS_CALL_FAILURE);
}
// replace standard output with output file
if (dup2(out, STDOUT_FILENO) == -1) {
printf("Failed to operate dup2 for out\n");
printErrorInSysCallToSTDERR();
exit(SYS_CALL_FAILURE);
}
ThreadPool* tp = (ThreadPool*)malloc(sizeof(ThreadPool));
if (tp == NULL) {
printf("Failure: allocate memory for thread pool struct");
return NULL;
}
tp->numOfThreads = numOfThreads;
tp->threads = (pthread_t*)malloc(sizeof(pthread_t) * tp->numOfThreads);
if (tp->threads == NULL) {
printf("Failure: allocate memory for threads array");
return NULL;
}
tp->tasksQueue = osCreateQueue();
pthread_mutex_init(&(tp->lock), NULL);
tp->stopped = 0;
tp->canInsert = 1;
if (pthread_mutex_init(&(tp->queueLock), NULL) != 0 ||
pthread_mutex_init(&(tp->queueLock), NULL) != 0 ||
pthread_cond_init(&(tp->notify), NULL) != 0) {
printf("Failure: initialize one required mutex or more\n");
tpDestroy(tp, 0);
return NULL;
}
int i;
for (i = 0; i < tp->numOfThreads; i++) {
if(pthread_create(&(tp->threads[i]), NULL, execute, (void *)tp) != 0) {
printf("Failure: creating a thread failed.\n");
}
}
return tp;
}
/**
* inserts a task to the tasks queue of the thread pool.
* @param threadPool thread pool
* @param computeFunc task
* @param param argument to the task
* @return 0- success , -1 - fail
*/
int tpInsertTask(ThreadPool* threadPool, void (*computeFunc) (void *), void* param) {
if(threadPool == NULL || computeFunc == NULL) {
return FAILURE;
}
if (!(threadPool->canInsert)) {
return FAILURE;
}
Task* task = (Task*)malloc(sizeof(Task));
if (task == NULL) {
printf("Failure: allocate memory for threads array");
return FAILURE;
}
task->computeFunc = computeFunc;
task->param = param;
osEnqueue(threadPool->tasksQueue, (void *)task);
pthread_mutex_lock(&(threadPool->queueLock));
// wake up thread that wait as long as the tasks queue is empty
if(pthread_cond_signal(&(threadPool->notify)) != 0) {
printf("Failure: signal opertion in tpInsertTask\n");
}
pthread_mutex_unlock(&(threadPool->queueLock));
return SUCCESS;
}
/**
* Destroys the thread pool.
* @param threadPool thread pool
* @param shouldWaitForTasks 0 - dont wait for tasks in the queue, else - wait for tasks.
*/
void tpDestroy(ThreadPool* threadPool, int shouldWaitForTasks) {
if (threadPool == NULL) {
return;
}
pthread_mutex_lock(&destryLock);
// first time enter to tpDestory with valid thread pool
if ( threadPool->canInsert != 0) {
threadPool->canInsert = 0;
// make sure tpDestroy will ne called only once for thr thread pool
} else {
return;
}
pthread_mutex_unlock(&destryLock);
if (shouldWaitForTasks == DONT_WAIT_FOR_TASKS) {
threadPool->stopped = 1;
}
int i, err;
pthread_mutex_lock(&(threadPool->queueLock));
/* Wake up all worker threads */
if((pthread_cond_broadcast(&(threadPool->notify)) != 0) ||
(pthread_mutex_unlock(&(threadPool->queueLock)) != 0)) {
printf("Exit due failure in tpDestory\n");
exit(1);
}
for (i = 0; i < threadPool->numOfThreads; i++) {
err = pthread_join(threadPool->threads[i], NULL);
if (err != 0) {
printf("Failure: waiting for thread no. %d\n", i);
}
}
threadPool->stopped = 1;
//free memory
while (!osIsQueueEmpty(threadPool->tasksQueue)) {
printf("Task was erased from tasks queue\n");
Task* task = osDequeue(threadPool->tasksQueue);
free(task);
}
osDestroyQueue(threadPool->tasksQueue);
free(threadPool->threads);
pthread_mutex_destroy(&(threadPool->lock));
pthread_mutex_destroy(&(threadPool->queueLock));
pthread_mutex_destroy(&destryLock);
free(threadPool);
}
最佳答案
从代码中尚不完全清楚 destryLock
互斥体的意图是什么,特别是因为它不是用 PTHREAD_MUTEX_INITIALIZER
初始化的。静态初始化器,也不是用 pthread_mutex_init 初始化的。然而它在 tpDestroy
函数中被销毁,因此任何对 pthread_mutex_lock
的调用可能会产生 EINVAL
错误。
话虽这么说,根据 tpDestroy
看起来应该要做的事情,即销毁使用 tpCreate
创建的线程池对象,该代码中不清楚逻辑的意图是什么;应该注意的是,这样可能会发生死锁情况:
pthread_mutex_lock(&destryLock);
// first time enter to tpDestory with valid thread pool
if ( threadPool->canInsert != 0) {
threadPool->canInsert = 0;
// make sure tpDestroy will ne called only once for thr thread pool
} else {
return; // dead lock since not unlocking after having locked
}
pthread_mutex_unlock(&destryLock);
这让人相信这段代码是由不完全理解多线程或不太理解设计如何与线程池相适应的人构建的(至少部分是这样的)。
将 destryLock
互斥量放在线程池结构本身中是有意义的,因为该函数是在传入的线程池对象上操作,而不是在全局对象上操作。
I would love to know the correct place to declare mutexes that are being used in the same way they are used here.
考虑到您对多线程和同步原语的理解,这个问题有点宽泛,相反,我将重点讨论您为什么需要互斥体以及您想要互斥体的位置它。
互斥锁允许多个线程阻塞代码区域,以便一次只有一个线程可以访问该代码。这样做是因为在多核系统上,多个线程完全有可能同时访问相同的数据,从而导致出现竞争条件,从而出现未定义的行为。
如果您想阻止来自多个线程的代码,那么 where 可以变得更加清晰,因为您将能够确定互斥体是否应该是全局/局部静态对象,或者它是否应该是一个成员对象。
举个例子,假设我有一个游戏,里面有很多敌人;我可能会将敌人的集合保存在某种列表中。当我想要迭代敌人列表时,例如碰撞检测、AI 或其他游戏效果,如果我的游戏中有多个线程作用于敌人列表,我可能需要一个互斥体在执行时锁定整个列表无论敌人的游戏逻辑是什么,这样敌人的状态对于所有线程来说都是准确的。然而,这可能不是最好的选择,因为它可能会导致延迟;相反,我可能希望每个敌人都有一个互斥锁,并且只锁定受逻辑影响的敌人。
因此,更多的是关于您想要保护哪些具有可变状态的对象。
希望能有所帮助。
关于c - 如何以及在何处声明互斥锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50709626/