问题描述
我编写了一个MPI代码,该代码当前通过将每个数组中相等数量的元素发送到不同的进程来工作来进行多线程处理(因此,对于6个工作程序,该数组分为6个相等的部分)。我想做的是仅在工作人员准备接收时才发送小块,并在不阻止将来发送的情况下接收完成的块;这样,如果一个块花费10秒,而另一个块花费1秒,则可以在等待长块完成的同时处理其他数据。
这是我整理的一些基本代码:
#include <mpi.h>
#include <iostream>
#include <vector>
#include <cmath>
struct crazytaxi
{
double a = 10.0;
double b = 25.2;
double c = 222.222;
};
int main(int argc,char** argv)
{
//Initial and temp kanno vectors
std::vector<crazytaxi> kanno;
std::vector<crazytaxi> kanno_tmp;
//init MPI
MPI_Init(NULL,NULL);
//allocate vector
int SZ = 4200;
kanno.resize(SZ);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD,&world_size);
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD,&world_rank);
if (world_rank == 0)
{
for (int i = 0; i < SZ; i++)
kanno[i].a = 1.0*i;
kanno[i].b = 10.0/(i+1);
}
for (int j = 0; j < 10; j++) {
//Make sure all processes have same kanno vector;
if (world_rank == 0) {
for (int i = 1; i < world_size; i++)
MPI_Send(&kanno[0],sizeof(crazytaxi)*kanno.size(),MPI_BYTE,i,3,MPI_COMM_WORLD);
} else {
MPI_Recv(&kanno[0],MPI_COMM_WORLD,MPI_STATUS_IGnorE);
}
//copy to tmp vector
kanno_tmp = kanno;
MPI_Barrier();
//the sender
if (world_rank == 0) {
unsigned p1 = 0;
unsigned segment = 10;
unsigned p2 = segment;
while (p1 < SZ) {
for (int i = 0; i < world_size; i++) {
//if (process #i is ready to receive)
//Send data in chunks of 10 to i
//else
//continue
}
}
}
if (world_rank != 0) {
//Receive data to be processed
//do some math
for (unsigned i = p1; i < p2; i++)
kanno_tmp[i].a = std::sqrt(kanno[i].a)/((double)i+1.0);
//Send processed data to 0 and wait to receive new data.
}
//copy temp vector to kanno
kanno = kanno_tmp;
}
//print some of the results;
if (world_rank == 0)
{
for (int i = 0; i < SZ; i += 40)
printf("Line %d: %lg,%lg\n",kanno[i].a,kanno[i].b);
}
MPI_Finalize();
}
除了我的MPI_Send和MPI_Recv调用将被阻止,或者“主”进程不知道“从”进程已准备好接收数据之外,我可以90%将其转换为所需的内容。
MPI中是否可以做类似的事情
unsigned Datapointer = [some_array_index];
while (Datapointer < array_size) {
if (world_rank == 0) {
for (int i = 1; i < world_size; i++)
{
if (<process i is ready to receive>) {
MPI_Send([...]);
Datapointer += 10;
}
if (<process i has sent data>)
MPI_Recv([...]);
if (Datapointer > array_size) {
MPI_Bcast([killswitch]);
break;
}
}
}
}
MPI_Barrier();
或者对于可变复杂性块或可变速度节点,有没有更有效的方法来构造它?
解决方法
@Gilles Gouaillardet 指出,在这种情况下,关键字为MPI_ANY_SOURCE
。使用它,进程可以从任何来源接收消息。要知道哪个进程发送该消息,可以在status.MPI_SOURCE
调用的状态上使用recv
。
MPI_Status status;
if(rank == 0) {
//send initial work to all processes
while(true) {
MPI_recv(buf,32,MPI_INT,MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
// do the distribution logic
MPI_send(buf,status.MPI_SOURCE,tag,MPI_COMM_WORLD);
// break out of the loop once the work is over and send all the processes
message to stop waiting for work
}
}
else {
while(true){
// receive work from rank 0
MPI_recv(buf,&status);
// Perform computation and send back the result
MPI_send(buf,MPI_COMM_WORLD);
//break this until asked by master 0 using some kind of special message
}
}