针对可变复杂性任务或可变速度节点的负载平衡MPI多线程?

问题描述

我编写了一个MPI代码,该代码当前通过将每个数组中相等数量的元素发送到不同的进程来工作来进行多线程处理(因此,对于6个工作程序,该数组分为6个相等的部分)。我想做的是仅在工作人员准备接收时才发送小块,并在不阻止将来发送的情况下接收完成的块;这样,如果一个块花费10秒,而另一个块花费1秒,则可以在等待长块完成的同时处理其他数据。

这是我整理的一些基本代码

#include <mpi.h>
#include <iostream>
#include <vector>
#include <cmath>

struct crazytaxi
{
    double a = 10.0;
    double b = 25.2;
    double c = 222.222;
};

int main(int argc,char** argv)
{
    //Initial and temp kanno vectors
    std::vector<crazytaxi> kanno;
    std::vector<crazytaxi> kanno_tmp;

    //init MPI
    MPI_Init(NULL,NULL);

    //allocate vector
    int SZ = 4200;
    kanno.resize(SZ);

    int world_size;
    MPI_Comm_size(MPI_COMM_WORLD,&world_size);

    int world_rank;
    MPI_Comm_rank(MPI_COMM_WORLD,&world_rank);

    if (world_rank == 0)
    {
        for (int i = 0; i < SZ; i++)
            kanno[i].a = 1.0*i;
            kanno[i].b = 10.0/(i+1);
    }

    for (int j = 0; j < 10; j++) {

        //Make sure all processes have same kanno vector;
        if (world_rank == 0) {
            for (int i = 1; i < world_size; i++)
                MPI_Send(&kanno[0],sizeof(crazytaxi)*kanno.size(),MPI_BYTE,i,3,MPI_COMM_WORLD);
        } else {
            MPI_Recv(&kanno[0],MPI_COMM_WORLD,MPI_STATUS_IGnorE);
        }

        //copy to tmp vector
        kanno_tmp = kanno;
        MPI_Barrier();

        //the sender
        if (world_rank == 0) {
            unsigned p1 = 0;
            unsigned segment = 10;
            unsigned p2 = segment;
            while (p1 < SZ) {
                for (int i = 0; i < world_size; i++) {
                    //if (process #i is ready to receive)
                        //Send data in chunks of 10 to i
                    //else
                        //continue
                }
            }
        }
        if (world_rank != 0) {
            //Receive data to be processed
            //do some math
            for (unsigned i = p1; i < p2; i++)
                kanno_tmp[i].a = std::sqrt(kanno[i].a)/((double)i+1.0);

            //Send processed data to 0 and wait to receive new data.
        }

        //copy temp vector to kanno
        kanno = kanno_tmp;
    }

    //print some of the results;
    if (world_rank == 0)
    {
        for (int i = 0; i < SZ; i += 40)
            printf("Line %d: %lg,%lg\n",kanno[i].a,kanno[i].b);
    }

    MPI_Finalize();
}

除了我的MPI_Send和MPI_Recv调用将被阻止,或者“主”进程不知道“从”进程已准备好接收数据之外,我可以90%将其转换为所需的内容

MPI中是否可以做类似的事情

unsigned Datapointer = [some_array_index];
while (Datapointer < array_size) {
    if (world_rank == 0) {
        for (int i = 1; i < world_size; i++)
        {
            if (<process i is ready to receive>) {
                MPI_Send([...]);
                Datapointer += 10;
            }
            if (<process i has sent data>)
                MPI_Recv([...]);
            if (Datapointer > array_size) {
                MPI_Bcast([killswitch]);
                break;
            }
        }
    }
}
MPI_Barrier();

或者对于可变复杂性块或可变速度节点,有没有更有效的方法来构造它?

解决方法

@Gilles Gouaillardet 指出,在这种情况下,关键字为MPI_ANY_SOURCE。使用它,进程可以从任何来源接收消息。要知道哪个进程发送该消息,可以在status.MPI_SOURCE调用的状态上使用recv

MPI_Status status;
if(rank == 0) {
  //send initial work to all processes
  while(true) {
    MPI_recv(buf,32,MPI_INT,MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
    // do the distribution logic
    MPI_send(buf,status.MPI_SOURCE,tag,MPI_COMM_WORLD);
    // break out of the loop once the work is over and send all the processes 
    message to stop waiting for work
  }
}
else {
  while(true){
    // receive work from rank 0
    MPI_recv(buf,&status);
    // Perform computation and send back the result
    MPI_send(buf,MPI_COMM_WORLD);
    //break this until asked by master 0 using some kind of special message
  }
}