像这样使用 pragma omp simd 正确吗?

问题描述


#include <stdio.h>
#include <stdlib.h>
#include <omp.h>
#define pow(x) ((x) * (x))
#define NUM_THREADS 8
#define wmax 1000
#define Nv 2
#define N 5

int b=0;

float Points[N][Nv]={ {0,1},{3,4},{1,2},{5,{8,9}};

float length[wmax+1]={0};

float Eucldist(float* Ne,float* Pe) {
    int i;
    float s = 0;
 
    for (i = 0; i < Nv; i++) {
        s += pow(Ne[i] - Pe[i]); 
    }
    return s; 
}

void distanceFinder(float* a[]){
    
    int i;
    #pragma omp simd
    for (i=1;i<N+1;i++){  
        length[b] += Eucldist(&a[i],&a[i-1]);
    }
    //printf(" %f\n",length[b]);
}



void NewRoute(){
//some irrelevant things

distanceFinder(Points);
}

int main(){
    
omp_set_num_threads(NUM_THREADS);


do{
    b+=1;
    NewRoute();
    } while (b<wmax);
}

尝试并行化这个循环并尝试不同的东西,尝试了这个。

似乎是最快的,但是这样使用 SIMD 是否正确?因为我使用的是以前的迭代(ii - 1)。我看到的结果奇怪与否。

解决方法

似乎是最快的,但是这样使用 SIMD 是否正确?

首先,有一个竞争条件需要修复,即在数组 length[b] 更新期间。此外,您正在访问数组 a 之外的内存; (从 1 迭代到 N + 1),并且您正在传递 &a[i]。您可以使用 OpenMP reduction 子句修复竞争条件

void DistanceFinder(float* a[]){
    
    int i;
    float sum = 0;
    float tmp;
    #pragma omp simd private(tmp) reduction(+:sum)
    for (i=1;i<N;i++){  
        tmp = EuclDist(a[i],a[i-1]);
        sum += tmp;
    }
    length[b] += sum;
}

此外,您需要提供一个 EuclDist 版本,如下所示:

#pragma omp declare simd uniform(Ne,Pe)
float EuclDist(float* Ne,float* Pe) {
    int i;
    float s = 0;
    for (i = 0; i < Nv; i++)
        s += pow(Ne[i] - Pe[i]); 
    return s; 
}

因为我使用的是之前的迭代(i 和 i - 1)。

在你的情况下,没关系,因为数组 a 刚刚被读取。

我看到的结果奇怪与否。

很可能没有进行矢量化。无论如何,由于上述竞争条件,它仍然是未定义行为

您可以简化代码,以增加实际发生矢量化的可能性,例如:

void DistanceFinder(float* a[]){
    int i;
    float sum = 0;
    float tmp;
    #pragma omp simd private(tmp) reduction(+:sum)
    for (i=1;i<N;i++){  
        tmp = pow(a[i][0] - a[i-1][0]) + pow(a[i][1] - a[i-1][1]) 
        sum += tmp;
    }
    length[b] += sum;
}

为提高代码性能可以做的进一步更改是分配矩阵(作为函数 DistanceFinder 的参数传递)的分配方式,当您遍历其行时 (ie,a[i]) 你会遍历连续的内存地址。

例如,您可以传递两个数组 a1a2 来表示矩阵 a 的第一列和第二列:

  void DistanceFinder(float a1[],float a2[]){
        int i;
        float sum = 0;
        float tmp;
        #pragma omp simd private(tmp) reduction(+:sum)
        for (i=1;i<N;i++){  
            tmp = pow(a1[i] - a1[i-1]) + pow(a2[i][1] - a2[i-1][1]) 
            sum += tmp;
        }
        length[b] += sum;
    }