问题描述
#include <stdio.h>
#include <stdlib.h>
#include <omp.h>
#define pow(x) ((x) * (x))
#define NUM_THREADS 8
#define wmax 1000
#define Nv 2
#define N 5
int b=0;
float Points[N][Nv]={ {0,1},{3,4},{1,2},{5,{8,9}};
float length[wmax+1]={0};
float Eucldist(float* Ne,float* Pe) {
int i;
float s = 0;
for (i = 0; i < Nv; i++) {
s += pow(Ne[i] - Pe[i]);
}
return s;
}
void distanceFinder(float* a[]){
int i;
#pragma omp simd
for (i=1;i<N+1;i++){
length[b] += Eucldist(&a[i],&a[i-1]);
}
//printf(" %f\n",length[b]);
}
void NewRoute(){
//some irrelevant things
distanceFinder(Points);
}
int main(){
omp_set_num_threads(NUM_THREADS);
do{
b+=1;
NewRoute();
} while (b<wmax);
}
尝试并行化这个循环并尝试不同的东西,尝试了这个。
似乎是最快的,但是这样使用 SIMD 是否正确?因为我使用的是以前的迭代(i
和 i - 1
)。我看到的结果奇怪与否。
解决方法
似乎是最快的,但是这样使用 SIMD 是否正确?
首先,有一个竞争条件需要修复,即在数组 length[b]
更新期间。此外,您正在访问数组 a
之外的内存; (从 1
迭代到 N + 1
),并且您正在传递 &a[i]
。您可以使用 OpenMP reduction
子句修复竞争条件:
void DistanceFinder(float* a[]){
int i;
float sum = 0;
float tmp;
#pragma omp simd private(tmp) reduction(+:sum)
for (i=1;i<N;i++){
tmp = EuclDist(a[i],a[i-1]);
sum += tmp;
}
length[b] += sum;
}
此外,您需要提供一个 EuclDist
版本,如下所示:
#pragma omp declare simd uniform(Ne,Pe)
float EuclDist(float* Ne,float* Pe) {
int i;
float s = 0;
for (i = 0; i < Nv; i++)
s += pow(Ne[i] - Pe[i]);
return s;
}
因为我使用的是之前的迭代(i 和 i - 1)。
在你的情况下,没关系,因为数组 a
刚刚被读取。
我看到的结果奇怪与否。
很可能没有进行矢量化。无论如何,由于上述竞争条件,它仍然是未定义行为。
您可以简化代码,以增加实际发生矢量化的可能性,例如:
void DistanceFinder(float* a[]){
int i;
float sum = 0;
float tmp;
#pragma omp simd private(tmp) reduction(+:sum)
for (i=1;i<N;i++){
tmp = pow(a[i][0] - a[i-1][0]) + pow(a[i][1] - a[i-1][1])
sum += tmp;
}
length[b] += sum;
}
为提高代码性能可以做的进一步更改是分配矩阵(作为函数 DistanceFinder
的参数传递)的分配方式,当您遍历其行时 (ie,a[i]
) 你会遍历连续的内存地址。
例如,您可以传递两个数组 a1
和 a2
来表示矩阵 a
的第一列和第二列:
void DistanceFinder(float a1[],float a2[]){
int i;
float sum = 0;
float tmp;
#pragma omp simd private(tmp) reduction(+:sum)
for (i=1;i<N;i++){
tmp = pow(a1[i] - a1[i-1]) + pow(a2[i][1] - a2[i-1][1])
sum += tmp;
}
length[b] += sum;
}