使用 pthreads.h 的并行 c 代码使用少于 100% CPU 并且使用更多线程时速度较慢

问题描述

所以我有一个项目,我需要在其中实现生命游戏,然后在 c 中将其并行化。但是,当我尝试使用 pthreads.h 对其进行并行化时,程序在引入更多线程时运行速度较慢,并且 %cpu 低于 100%(在 ubuntu 终端中使用 top 时,我有一个 Ubuntu Windows 子系统)。这是我的代码

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <pthread.h>

/*  Parallel code with PTHREADS v1 */

//Global variables 
int N;          // Size of the world
int nthreads;   // Number of threads
pthread_mutex_t lock;

typedef struct info_thread
{
    int threadID;       // thread ID
    int low;            // lower limit of interval
    int high;           // higher limit of interval
    int **world;        // pointer to the world matrix
    int **neighbors;    // pointer to the neighbors matrix
    //int **neighbors_2;  // pointer to the neighbors_2 matrix
    //int **one_step;     // pointer to the one_step matrix
}t_info;

void * thread_func(void *arg);
void print_world(int **world);
void count_neighbors(int **world,int **neighbors);
void next_step(int **world,int **one_step,int **neighbors);
void update(int **world,int **one_step);
int compare(int **world,int **two_steps,int **old,int status);

int main(int argc,const char *argv[])
{
    if (argc != 5)
    {
        printf("Give the following input arguments:\n");
        printf("N: Size of the NxN world (integer)\n");
        printf("Initial state: random (0),chessboard (1)\n");
        printf("Output: Number of steps until final state (0) \n");
        printf("        Number of steps until final state,initial and final states (1) \n");
        printf("        Number of steps until final state and all states states (2) \n");
        printf("Threads: Number of threads (integer)\n");
        exit(0);
    }

    N = atoi(argv[1]);
    const int pattern = atoi(argv[2]);
    const int output = atoi(argv[3]);
    nthreads = atoi(argv[4]);

    // Create necessary matrices
    const int n = N+1;
    int **buffer = (int **)malloc(6 * n * sizeof(int *));
    for(int i = 0; i < (6*n); i++)
    {
        buffer[i] = (int *)malloc(n*sizeof(int));
    }

    int **world = &buffer[0];
    int **neighbors = &buffer[n];
    int **neighbors_2 = &buffer[2*n];
    int **one_step = &buffer[3*n];
    int **two_steps = &buffer[4*n];
    int **old = &buffer[5*n];

    // Setting a random initial pattern
    if(pattern == 0){
        srand(time(0));
        for (int i = 0; i < N; i++)
        {
            for (int j = 0; j < N; j++)
            {
                int r = rand() % 10;
                if (r > 5)
                    world[i][j] = 1;
                else
                    world[i][j] = 0;
            }
        }
    }
    // Setting a chessboard initial state
    else if(pattern == 1){
        for (int i = 0; i < N; i++)
        {
            for (int j = 0; j < N; j++)
            {
                if(i%2 == 0){
                    if(j%2 == 0)
                        world[i][j] = 0;
                    else
                        world[i][j] = 1;
                }
                else{
                    if(j%2 == 0)
                        world[i][j] = 1;
                    else
                        world[i][j] = 0;
                }
            }
        }
    }

    if(output==1 || output==2){
        printf("Initial state:\n");
        print_world(world);
    }
    
    int status = 1;
    int t = 1;
    update(old,world);

    // Create threads and input info
    pthread_t thread[nthreads];
    t_info threadinfo[nthreads];
    const int interval = N/nthreads;

    while(status == 1)
    {
        for (int k=0; k<nthreads; k++)
        {
            threadinfo[k].threadID = k;
            threadinfo[k].low = k*interval;
            threadinfo[k].high = (k+1)*interval-1;
            threadinfo[k].world = world;
            threadinfo[k].neighbors = neighbors;
        }
        threadinfo[nthreads-1].high = N;

        // Predict one step forward
        pthread_mutex_init(&lock,NULL);
        for (int k=0; k<nthreads; k++)
            pthread_create(&thread[k],NULL,thread_func,(void *)&threadinfo[k]);
        for (int k=0; k<nthreads; k++)
            pthread_join(thread[k],NULL);
        pthread_mutex_destroy(&lock);
        next_step(world,one_step,neighbors);
        
        // Predict two steps forward
        for (int k=0; k<nthreads; k++)
        {
            threadinfo[k].world = one_step;
            threadinfo[k].neighbors = neighbors_2;
        }
        for (int k=0; k<nthreads; k++)
            pthread_create(&thread[k],NULL);
        
        //count_neighbors(one_step,neighbors_2);
        next_step(one_step,two_steps,neighbors_2);

        // Compare all predicted steps
        status = compare(world,old,status);
        
        // Update world with two steps
        update(world,two_steps);

        
        for(int i = 0; i < N; i++)
        {
            for(int j = 0; j < N; j+=2)
            {
                neighbors[i][j] = 0;
                neighbors[i][j+1] = 0;
                neighbors_2[i][j] = 0;
                neighbors_2[i][j+1] = 0;
            }
        }

        if((output == 2) && (status == 1)){
            printf("Step %d:\n",t);
            print_world(one_step);
            printf("Step %d:\n",t+1);
            print_world(two_steps);
        }
        
        // Save prevIoUs step
        update(old,world);
        //t+=1;
        t+=2;
    }

    //printf("It took %d steps to reach the final state\n",t-2);
    printf("It took %d steps to reach the final state\n",(t-3));
    if(output==1 || output ==2){
        printf("Final state:\n");
        print_world(world);
    }    
    
    for (int i = 0; i < (6*n); i++)
    {
        free(buffer[i]);
    }
    free(buffer);
}

void * thread_func(void *arg)
{
    pthread_mutex_lock(&lock);
    t_info *threadinfo = arg;
    int threadID = threadinfo->threadID;
    int low = threadinfo->low;
    int high = threadinfo->high;
    //int **world = threadinfo->world;
    //int **neighbors = threadinfo->neighbors;

    int i; //rows
    int j; //col
    for (i = low; i <= high; i++){
        for (j = 0; j <= N-1; j++){     
            if (i > 0){
                if (j > 0){
                    if (threadinfo->world[i-1][j-1] == 1)
                        threadinfo->neighbors[i][j] +=1;
                }
                if (j < N-1){
                    if (threadinfo->world[i-1][j+1] == 1)
                        threadinfo->neighbors[i][j] +=1;
                }
                if (threadinfo->world[i-1][j] == 1)
                        threadinfo->neighbors[i][j] +=1;
            }
            if (i < N-1){
                if (j > 0){
                    if (threadinfo->world[i+1][j-1] == 1)
                        threadinfo->neighbors[i][j] +=1;
                }
                if (j < N-1){
                    if (threadinfo->world[i+1][j+1] == 1)
                        threadinfo->neighbors[i][j] +=1;
                }
                if (threadinfo->world[i+1][j] == 1)
                        threadinfo->neighbors[i][j] +=1;
            }
            if (j > 0){
                if (threadinfo->world[i][j-1] == 1)
                        threadinfo->neighbors[i][j] +=1;
            }
            if(j < N-1){
                if (threadinfo->world[i][j+1] == 1)
                        threadinfo->neighbors[i][j] +=1;
            }
        }
    }
    pthread_mutex_unlock(&lock);
    pthread_exit(NULL);
}


void print_world(int **world)
{
    for (int i = 0; i < N; i++)
    {
        for (int j = 0; j < N; j+=2)
        {
            printf("%d ",world[i][j]);
            printf("%d ",world[i][j+1]);
        }
        printf("\n");
    }
    printf("\n");   
}

void count_neighbors(int **world,int **neighbors)
{
    int i; //rows
    int j; //col
    for (i = 0; i <= N-1; i++){
        for (j = 0; j <= N-1; j++){     
            if (i > 0){
                if (j > 0){
                    if (world[i-1][j-1] == 1)
                        neighbors[i][j] +=1;
                }
                if (j < N-1){
                    if (world[i-1][j+1] == 1)
                        neighbors[i][j] +=1;
                }
                if (world[i-1][j] == 1)
                        neighbors[i][j] +=1;
            }
            if (i < N-1){
                if (j > 0){
                    if (world[i+1][j-1] == 1)
                        neighbors[i][j] +=1;
                }
                if (j < N-1){
                    if (world[i+1][j+1] == 1)
                        neighbors[i][j] +=1;
                }
                if (world[i+1][j] == 1)
                        neighbors[i][j] +=1;
            }
            if (j > 0){
                if (world[i][j-1] == 1)
                        neighbors[i][j] +=1;
            }
            if(j < N-1){
                if (world[i][j+1] == 1)
                        neighbors[i][j] +=1;
            }
        }
    }
}

void next_step(int **world,int **neighbors)
{
    int i,j;
    for (i = 0; i < N; i++){
        for (j = 0; j < N; j++){
            if (world[i][j] == 1)
            {
                if (neighbors[i][j] == 2 || neighbors[i][j] == 3)
                    one_step[i][j] = 1;
                else
                    one_step[i][j] = 0;
            }
            else if (world[i][j] == 0)
            {
                if (neighbors[i][j] == 3)
                    one_step[i][j] = 1;
                else
                    one_step[i][j] = 0;
            }
        }
    }
}

void update(int **world,int **one_step)
{
    for (int i = 0; i < N; i++)
    {
        for (int j = 0; j < N; j+=2)
        {
            world[i][j] = one_step[i][j];               
            world[i][j+1] = one_step[i][j+1];
        }
            
    }
}

int compare(int **world,int status)
{
    int counter1=0,counter2=0,counter3=0;
    for (int i = 0; i < N; i++)
    {
        for (int j = 0; j < N; j++)
        {
            if(world[i][j] == one_step[i][j])
                counter1++;
            if(world[i][j] == 0)
                counter2++;
            if(old[i][j] == two_steps[i][j])
                counter3++;
        }
    }
    if (counter1 == (N*N))
        status = 0;
    else if(counter2 == (N*N))
        status = 0;
    else if(counter3 == (N*N))
        status = 0;
    return status;
}

当我编译代码并使用 2、4 和 8 个线程运行它时,我得到以下结果:

gcc -o gol gol.c -lpthread
time ./gol 500 1 0 2
It took 1670 steps to reach the final state

real    0m10.064s
user    0m8.971s
sys     0m0.246s
time ./gol 500 1 0 4
It took 1670 steps to reach the final state

real    0m15.694s
user    0m9.976s
sys     0m0.437s
time ./gol 500 1 0 8
It took 1670 steps to reach the final state

real    0m14.600s
user    0m10.400s
sys     0m0.855s

当使用 2 个线程时,使用 top 的 cpu 百分比为 ~65%,使用 4 个线程时为 ~78%,使用 8 个线程时为 ~100%。我做错了什么?

解决方法

你有两个问题:

a) 创建线程并等待它们终止会增加开销。在“while(status == 1)”循环中执行此操作意味着您要重复支付额外的开销。最好创建一次线程(在循环外)然后重新使用现有线程,使用某些东西(例如条件变量)使线程等待循环的下一次迭代。

b) 互斥体的存在是为了防止(不需要的)并行性,并且也有开销。如果线程获得互斥锁,则执行它们的工作,然后释放互斥锁;那么你是故意阻止这些线程之间的所有并行性。对于您的代码,主线程和新创建的线程之间的并行性也会被阻止(主线程在 pthread_join() 中等待)。

本质上;您增加了大量开销(创建和销毁线程、获取和释放互斥锁)并阻止所有并行性以确保没有任何好处可以超过额外开销;导致代码比根本不使用线程更糟糕。

要解决此问题,您需要找到确保线程可以并行执行有用工作的方法。最简单的方法可能是使用 2 个全局数组来表示世界的状态,其中一个数组是“前一个世界状态”,另一个是“下一个世界状态”,并且您交换的位置(指向) 步骤之间的数组。在这种情况下,在一个步骤中,“前一个世界状态”只是被读取(并且许多线程可以毫无问题地并行读取)并且每个线程可以并行更新“下一个世界状态”的不同部分。请注意,由于线程将写入“下一个世界状态”中的每个单元格,因此您也不需要在步骤之间清除“下一个世界状态”。

警告:确保更新数组的一个元素不会导致数组中其他/相邻元素的“缺乏原子性”问题;您将需要将原子类型(sig_atomic_t,我认为是 C11)与二维数组一起使用,或者使用“一维数组指针数组”(其中每一行只能由一个线程修改)和元素是 volatile。请注意,如果世界状态是 8 * 8 网格,您可能可以用单个 uint8_t 表示整行(这意味着它可能成为“指向 volatile uint8_t 的指针的一维数组)。

基本上(如果包括线程的重用)除了等待主线程开始下一步的工作线程以及等待工作线程完成的主线程之外,它可以在不使用任何互斥体的情况下完成当前步骤。

还有;主线程也可以参与做有用的工作,而不是等待工作线程。例如,如果世界状态是一个 8 * 8 的网格,那么“主线程 + 7 个工作线程”可以每个(并行)做一行,以确保所有 8 行都做完。然而,当线程具有相同的优先级时,线程数多于 CPU 很少是明智的;因此,检查计算机有多少 CPU 并限制线程数可能是一个好主意(例如,如果有 4 个 CPU,那么您可能有“主线程 + 3 个以上的线程每个执行 2 行”)。