问题描述
我正在创建一个多线程文件服务器。我创建了一个线程池来处理来自客户端的请求。这是我的线程池代码:
void *worker_call(void *thread_id){
printf("Initialized thread #%ld \n",(long)thread_id);
//pull in initialized global configs
extern pthread_mutex_t queue_m;
extern pthread_cond_t worker_c;
extern steque_t queue;
while(1){
if (pthread_mutex_lock(&queue_m) != 0){
fprintf(stderr,"An error occured while locking mutex in #%ld \n",(long)thread_id);
}
while(steque_isempty(&queue) == 1){
printf("thread #%ld - Going to sleep...\n",(long) thread_id);
pthread_cond_wait(&worker_c,&queue_m);
printf("thread #%ld - I'm waking up...\n",(long) thread_id);
}
int *work = steque_pop(&queue);
if (pthread_mutex_unlock(&queue_m) != 0){
fprintf(stderr,"An error occured while unlocking mutex in #%ld \n",(long)thread_id);
}
pthread_cond_broadcast(&worker_c);
sleep(1); //added to make sure that the other threads have a chance to wake up
printf("thread #%ld - what is the value of work: %d\n",(long) thread_id,*work);
// process_request(&(work->ctx),work->path,work->arg,(long)thread_id);
free(work);
}
return NULL;
}
要测试我的线程池是否正确启动并且所有线程都在正常工作,我创建了以下测试:
int main(){
int nthreads = 6;
pthread_t threads[nthreads];
long thread_ids[nthreads];
pthread_attr_t thread_attr;
pthread_attr_init(&thread_attr);
pthread_attr_setdetachstate(&thread_attr,PTHREAD_CREATE_JOINABLE);
pthread_attr_setscope(&thread_attr,PTHREAD_ScopE_SYstem);
for(int t=0;t<nthreads;t++){
thread_ids[t] = t;
if( pthread_create(&threads[t],NULL,worker_call,(void *)thread_ids[t]) != 0 ) {
printf("An error occured while creating thread: %d\n",t);
}
}
pthread_attr_destroy(&thread_attr);
//init multi-threading configs
extern pthread_mutex_t queue_m;
pthread_mutexattr_t m_attr;
pthread_mutexattr_init(&m_attr);
pthread_mutexattr_settype(&m_attr,PTHREAD_MUTEX_ERRORCHECK);
pthread_mutex_init(&queue_m,NULL);
extern pthread_cond_t worker_c;
pthread_cond_init(&worker_c,NULL);
extern steque_t queue;
steque_init(&queue);
//create a simple queue with each item being an int.
//the goal is to simply remove the items from the queue.
for(int i = 0; i < 5; i++){
int *work = malloc(sizeof(int));
*work = i;
steque_enqueue(&queue,work);
}
printf("queue size after creating it: %d\n",steque_size(&queue));
pthread_cond_broadcast(&worker_c);
for(int t=0;t<nthreads;t++){
pthread_join(threads[t],NULL);
}
printf("All the threads finished processing\n");
return 0;
}
现在此代码已成功运行,但是,只有一个线程被唤醒,它可以完成所有工作,如下面的输出所示:
Initialized thread #0
thread #0 - Going to sleep...
Initialized thread #1
thread #1 - Going to sleep...
Initialized thread #2
thread #2 - Going to sleep...
Initialized thread #3
thread #3 - Going to sleep...
Initialized thread #4
thread #4 - Going to sleep...
Initialized thread #5
thread #5 - Going to sleep...
queue size after creating it: 5
thread #5 - I'm waking up...
thread #5 - what is the value of work: 0
thread #5 - what is the value of work: 1
thread #5 - what is the value of work: 2
thread #5 - what is the value of work: 3
thread #5 - what is the value of work: 4
thread #5 - Going to sleep...
我的问题是,为什么其他线程不醒来并从队列中抓取项目?我已经尝试在发出pthread_cond_broadcast之后添加一秒的延迟,以给其他线程足够的时间来锁定互斥锁,但是该方法没有成功。有人看到我在做什么错吗?
解决方法
我无法发布简单的评论,因为我没有+50代表。因此,我必须将其发布为答案。
当您说“成功运行”时,您对成功的定义是什么?这里有几个比赛条件令人震惊,我很惊讶这一切都奏效。例如,worker_call()在实际初始化之前访问队列,互斥量和cond变量。
此外,当您最初将项目推入main()中的队列时,您应该将该代码包装在互斥锁中,因为您的线程也在尝试访问该变量。
我的建议是先解决这些比赛条件,然后再更新结果。
编辑1:
另外,在worker_call中,worker在调用cond_wait之前正在检查队列是否为空,但在唤醒后不检查是否为空。由于项目数量可能少于工作人员,因此该功能将需要再次检查队列是否为空。