本文使用MPICH3.1.4,不介绍理论,只介绍用法.

MPI全称: Message Passing Interface
MPI采用简单暴力的多进程来实现并行。即将同样的程序拷贝给所有的处理器去执行。 没有冗余容错机制,也没有MapRecude。 唯一提供的就是进程间的通信。由于其灵活性很大,所以可以写出能发挥出硬件最大性能的程序。

Quick Start

MPICH

编译安装

1
2
3
4
5
wget http://www.mpich.org/static/downloads/3.1.4/mpich-3.1.4.tar.gz
tar zxf mpich-3.1.4.tar.gz
./configure --disable-fortran
make -j4
make install

MPICH VS OpenMPI

Hello World

下面这段程序中MPI_init()MPI_Finalize()中夹着的就是要并行执行的部分。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
//hello.c
#include <stdio.h>
#include <mpi.h>

int main(int argc, char *argv[])
{
    MPI_Init(&argc, &argv); //初始化MPI环境

    printf("Hello World!\n");

    MPI_Finalize(); //结束MPI环境
    return 0;
}

编译,然后执行它:

1
2
3
mpicc -o hello hello.c
mpirun -np 4 ./hello
mpiexec -n 4 ./hello

这之后会输出4个Hello World, 当然mpirunmpiexec你只要运行一个就OK了,这两句是等价的,你也可以修改开启的进程数目,即-n后的参数。关于mpirun和mpiexec的区别,戳这里
mpirun是MPI各个实现的启动MPI的方式,而mpiexec是MPI标准启动方式,使用mpiexec要更好一些。

基本的API

初始化。

1
2
int MPI_Init(int *argc, char*argv[]); //这句应该放在所有并行操作的最前面。
int MPI_Initialized(int *flag); //是否初始化, 结果保存在flag中

并行结束。

1
int MPI_Finalize();

rank和进程数目。

1
2
int MPI_Comm_size(MPI_Comm comm, int *size); //把comm通信下的processor的个数存入size中
int MPI_Comm_rank(MPI_Comm comm, int *rank); //当前进程的的标识存入rank中(0 ~ size-1)

comm为MPI_COMM_WORLD时,表示当前程序所有能用的进程。
一般,上面这两句在大部分MPI程序中都是紧挨着MPI_init()的。
使用这两个函数,可以方便的实现master-slave的并行模式。

1
2
3
if ( rank == 0 ) DoMaster();
else if ( rank == 1 ) DoSlave1();
else if ( rank == 2 ) DoSlave2();

把每个进程处理的函数写到对应的函数中将是一个明智的选择。

时间测定。

1
2
int MPI_Wtime(void); //程序此刻的时间戳
int MPI_Wtick(void); //两个时钟脉冲间隔

要测定一段程序的运行,把程序结束后和开始前的时间戳相减就能得到了。

P2P通信规范

上述的Hello World程序并不是MPI主要用法。毕竟通信才是MPI的关注重点。点对点通信就是只涉及到两个processor之间的消息传输。也就是sendrecv。首先,先看一段点对点通信的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <stdio.h>
#include <string.h>
#include <mpi.h>

int main(int argv, char* argc[]){
    int rank, tot, i;
    char msg[128], rev[128];
    MPI_Status state;
    MPI_Init(&argv, &argc);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &tot);
    if (rank == 0){
        for(i = 1; i < tot; i++){
            sprintf(msg,"hello, %d, this is zero, I'am your master", i);
            MPI_Send(msg, 128, MPI_CHAR, i, 0, MPI_COMM_WORLD);
        }
        for(i = 1; i < tot; i++){
            MPI_Recv(rev, 128, MPI_CHAR, i, 0, MPI_COMM_WORLD, &state);
            printf("P%d got: %s\n", rank, rev);
        }
    }else{
        MPI_Recv(rev, 128, MPI_CHAR, 0, 0, MPI_COMM_WORLD, &state);
        printf("P%d got: %s\n", rank, rev);
        sprintf(msg, "hello, zero, this is %d, I'am your slave", rank);
        MPI_Send(msg, 128, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
    }
    MPI_Finalize();
    return 0;
}

这一段中使用MPI_Send()MPI_Recv来实现master和slave之间的hello。使用的通信是较为简单的阻塞式通信。而通信方式分为阻塞通信(Blocking)非阻塞通信(Non-blocking)
关于阻塞和非阻塞,区别无非就是调用后需不需要挂起等待。当然,更细致的区别正是这一段要重点讨论的。

阻塞通信

在阻塞通信中,有四种模式:

  • 标准通信模式(The Standard mode) 标准通信模式下。使不使用缓存。是由MPI本身决定的。缓冲区不是MPI的标准,但允许是MPI某个实现的一部分。
    如果有缓存,每个processor都有独立的一个的system buffer。它们是下图所示: system buffer
    讨论缓冲区时要区别System Buffer和Application Buffer System Buffer的特点是:

    对程序员透明,由MPI底层控制;
    资源有限,很容易耗尽;
    可以提高程序的性能,因为此时send/recv操作实际上是异步的;

    Application Buffer则是:

    供用户使用的内存。比如你定义变量使用的内存。

    对带有System Buffer的情况来说,阻塞发送实际上指的是成功将要发送的内容,拷贝到接收端的System Buffer中的这一过程。阻塞接收实际上是指向System Buffer发送请求到Application Buffer获得数据的过程。

    涉及到的API:

    1
    2
    3
    int MPI_Send(void *buff, int count, MPI_Datatype type, int dest, int tag, MPI_Comm comm);
    int MPI_Recv(void *buff, int count, MPI_Datatype type, int source, int tag, MPI_Comm comm, MPI_Status *status);
    MPI_Status status;
    

    其中(buff, count, type)三元组在所有涉及通信的过程中都有,而且都是最开始的三个参数。表示信息在内存中的指针,发送的数目,以及数据的类型。
    dest和source则表示信息的目的地和来源。而tag表示消息的一个标识。也有MPI_ANY_SOURCEMPI_ANY_TAG这两个值,表示接收所有source,所有tag的信息。comm这个参数同上不解释,status这个参数存储这接收到的信息的状态。

    1
    2
    3
    status.MPI_SOURCE  //信息来源
    status.MPI_TAG     //信息的Tag
    MPI_Get_count(MPI_Status *status, MPI_datatype type, int *count)  //多少个该类型的数据。
    

    对于数据类型,基本的类型有:

    1
    MPI_CHAR,MPI_DOUBLE,MPI_FLOAT,MPI_INT,MPI_LONG,MPI_LONG_DOUBLE,MPI_SHORT,MPI_UNSIGNED_CHAR,MPI_UNSIGNED,MPI_UNSIGNED_LONG,MPI_UNSIGNED_SHORT
    

    基本上看名字就知道在C中对应的类型。 再举个标准阻塞通信的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <stdio.h>
#include "mpi.h"

int main(int argc, char *argv[])  {
    int numtasks, rank, dest, source, rc, count, tag=1; 
    char inmsg, outmsg='x';
    MPI_Status Stat;

    MPI_Init(&argc,&argv);
    MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if (rank == 0){
        dest = 1;
        source = 1;
        rc = MPI_Send(&outmsg, 1, MPI_CHAR, dest, tag, MPI_COMM_WORLD);
        rc = MPI_Recv(&inmsg, 1, MPI_CHAR, source, tag, MPI_COMM_WORLD, &Stat);
    }else if (rank == 1){
        dest = 0;
        source = 0;
        rc = MPI_Recv(&inmsg, 1, MPI_CHAR, source, tag, MPI_COMM_WORLD, &Stat);
        rc = MPI_Send(&outmsg, 1, MPI_CHAR, dest, tag, MPI_COMM_WORLD);
    }

    rc = MPI_Get_count(&Stat, MPI_CHAR, &count);
    printf("Task %d: Received %d char(s) from task %d with tag %d \n",
           rank, count, Stat.MPI_SOURCE, Stat.MPI_TAG);
    MPI_Finalize();
    return 0;
}
  • 同步通信模式(The Synchronous mode) 理解上述的标准模式,同步模式就好说了。所谓同步,指的是信息传输终止于接收方已经开始将数据拷贝到Application Buffer中去。只有一个API

    1
    MPI_Ssend(void *buff, int count, MPI_Datatype type, int dest, int tag, MPI_Comm comm);
    

    参数列表和MPI_Send一样。

  • 准备好通信模式(The Ready mode) 仅仅在程序员100%确定发送时接收方已经准备好时使用。不过在MPI的各种实现中基本上和标准的MPI_Send一样。

    1
    MPI_Rsend(void *buff, int count, MPI_Datatype type, int dest, int tag, MPI_Comm comm);
    
  • 缓存通信模式(The Buffered mode) 人工开辟一片System Buffer给程序员使用,使用方式就如同标准通信模式带缓存的情况。一般用来弥补System Buffer不足的缺点。

    1
    2
    3
    MPI_Bsend(void *buff, int count, MPI_Datatype type, int dest, int tag, MPI_Comm comma);
    MPI_Buffer_attach(void *buff, int size);
    MPI_Buffer_detach(void *buff, int size);
    

    MPI_Buffer_attach用来开辟一片缓存。size指的是字节数(bytes)。和MPI_Buffer_detach配套使用。

以上这四中模式中,准备好模式是最快的,然而很少使用。应为这种情况最少。标准模式和缓存模式也足够快,但增加了内存开销。一般来说用来应付较小的通信。同步模式最慢,但也最可靠。一般用来应付数据量大的通信。

非阻塞通信

非阻塞通信一般用来重叠通信和计算。为了最大限度压榨硬件而使用。
先介绍常用的API:

1
2
3
4
5
6
7
8
int MPI_Isend(void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm)
int MPI_Irecv(void *buf, int count, MPI_Datatype datatype,int source, int tag, MPI_Comm comm, MPI_Request *request)

MPI_Wait(MPI_Request *request, MPI_Status *status);
MPI_Waitall(int count, MPI_Request *array_of_requests[], MPI_Status *array_of_statuses[]);

MPI_Test(MPI_Request *request, int *flag, MPI_status *status);
MPI_Testall(int count, MPI_Request *array_of_requests[], int *flag, MPI_Status *array_of_statuses[]);

虽然非阻塞通信有诸如MPI_IssendMPI_IbsendMPI_Irsend不过基本和前面大同小异。所谓非阻塞模式指的是在发送和接收的函数调用后立即返回,不管到底有没有发送和接收到,而操作的正确性由Wait和Test类函数来保证。也就是说,在一个非阻塞操作执行之后,在使用得到的数据之前要先wait或Test一下来保证这个数据是已经可用的。举个例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <stdio.h>
#include <unistd.h>
#include "mpi.h"

int main(int argc, char *argv[])  {
    int numtasks, rank, next, prev, buf[2], tag1=1, tag2=2;
    MPI_Request reqs[4];
    MPI_Status stats[4];
    buf[0] = -1;
    buf[1] = -1;
    MPI_Init(&argc,&argv);
    MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    prev = rank-1;
    next = rank+1;
    if (rank == 0)  prev = numtasks - 1;
    if (rank == (numtasks - 1))  next = 0;

    MPI_Irecv(&buf[0], 1, MPI_INT, prev, tag1, MPI_COMM_WORLD, &reqs[0]);
    MPI_Irecv(&buf[1], 1, MPI_INT, next, tag2, MPI_COMM_WORLD, &reqs[1]);

    printf("buffer before wait%d %d\n",buf[0],buf[1]);
    sleep(2);

    MPI_Isend(&rank, 1, MPI_INT, prev, tag2, MPI_COMM_WORLD, &reqs[2]);
    MPI_Isend(&rank, 1, MPI_INT, next, tag1, MPI_COMM_WORLD, &reqs[3]);

    MPI_Waitall(4, reqs, stats);
    printf("task %d got %d from prev\n",rank, buf[0]);
    printf("task %d got %d from next\n",rank, buf[1]);

    MPI_Finalize();
    return 0;
}

这里程序在执行MPI_Irev之后立即就返回了,程序继续执行,然而此时访问存储的数据发现并没有接收到。要到wait之后才确定。读者可以跑一下这个例子体会一下。

总结

集中对比一下阻塞式和非阻塞式通信的特点。
Blocking:

  1. A blocking send routine will only "return" after it is safe to modify the application buffer (your send data) for reuse. Safe means that modifications will not affect the data intended for the receive task. Safe does not imply that the data was actually received - it may very well be sitting in a system buffer.
  2. A blocking send can be synchronous which means there is handshaking occurring with the receive task to confirm a safe send.
  3. A blocking send can be asynchronous if a system buffer is used to hold the data for eventual delivery to the receive.
  4. A blocking receive only "returns" after the data has arrived and is ready for use by the program.

Non-blocking:

  1. Non-blocking send and receive routines behave similarly - they will return almost immediately. They do not wait for any communication events to complete, such as message copying from user memory to system buffer space or the actual arrival of message.
  2. Non-blocking operations simply "request" the MPI library to perform the operation when it is able. The user can not predict when that will happen.
  3. It is unsafe to modify the application buffer (your variable space) until you know for a fact the requested non-blocking operation was actually performed by the library. There are "wait" routines used to do this.
  4. Non-blocking communications are primarily used to overlap computation with communication and exploit possible performance gains. 上述内容摘抄自这里,我认为非常精辟。

顺序规则(Order Rules)

发送方连着发送两个消息给同一个接收方。先发的必被先收。
接收放连着请求两次同一个消息。先请求必先满足。
两个发送方给一个接收方发同样的消息,同时接收方只收一个,只有一个会被满足。(Starvation)

前提没有使用多线程。

集合通信规范

集合通信指的是包含在同一个communicator下的全部processor的通信。分3个类型:

  • Synchronization:强制所有processor都到达同步点。
  • Data Movement:数据交换,有broadcast,scaater,gather等。
  • Reductions:规约,将所有processor中的数据进行统计值计算。

一个集合通信的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include "mpi.h"
#include <stdio.h>
#define SIZE 4

int main(int argc, char *argv[])  {
    int numtasks, rank, sendcount, recvcount, source;
    float sendbuf[SIZE][SIZE] = {
      {1.0, 2.0, 3.0, 4.0},
      {5.0, 6.0, 7.0, 8.0},
      {9.0, 10.0, 11.0, 12.0},
      {13.0, 14.0, 15.0, 16.0}  };
    float recvbuf[SIZE];

    MPI_Init(&argc,&argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &numtasks);

    if (numtasks == SIZE) {
      source = 1;
      sendcount = SIZE;
      recvcount = SIZE;
      MPI_Scatter(sendbuf,sendcount,MPI_FLOAT,recvbuf,recvcount,
                 MPI_FLOAT,source,MPI_COMM_WORLD);

      printf("rank= %d  Results: %f %f %f %f\n",rank,recvbuf[0],
             recvbuf[1],recvbuf[2],recvbuf[3]);
      }
    else
      printf("Must specify %d processors. Terminating.\n",SIZE);

    MPI_Finalize();
    return 0;
}

集体同步

强制所有processor都运行到函数调用时才继续执行之后的语句。

1
MPI_Barrier (MPI_Comm comm);

数据传输

brodcast,scatter,gather,reduction的区别见下: Data Movement

1
2
3
4
int MPI_Bcast(void *buffer, int count, MPI_Datatype datatype,int root, MPI_Comm comm);
int MPI_Scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm);
int MPI_Gather(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm);
int MPI_Alltoall (void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcnt, MPI_Datatype recvtype, MPI_Comm comm);

还有MPI_Allgather相当于每个processor都做一次gather。

规约

1
2
3
int MPI_Reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype,op, int root, MPI_Comm comm);
int MPI_Op_create(MPI_User_function *function, int commute, MPI_Op *op);
//typedef void (MPI_User_function) ( void * a, void * b, int * len, MPI_Datatype * );

上述这个op就是规约的函数,用过MapReduce的肯定不陌生。

1
MPI_MAX,MPI_MIN,MPI_SUM ...

当然也可以自己定义函数MPI_Op_create,不过得对C的函数指针有些了解。

组和通信器

即Groups和Communicators。
一个Group对应一个Communicators,Group的目的是使你可以给task分组。然后可以在组内进行更加Safe的通信。通过MPI_Group_incl来创建新组,MPI_Comm_create来创建新的通信器。
例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include "mpi.h"
#include <stdio.h>
#define NPROCS 8

int main(int argc, char *argv[])  {
    int rank, new_rank, sendbuf, recvbuf, numtasks, ranks1[4]={0,1,2,3}, ranks2[4]={4,5,6,7};
    MPI_Group  orig_group, new_group;
    MPI_Comm   new_comm;

    MPI_Init(&argc,&argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &numtasks);

    if (numtasks != NPROCS) {
      printf("Must specify MP_PROCS= %d. Terminating.\n",NPROCS);
      MPI_Finalize();
      return 0;
    }

    sendbuf = rank;

    /* Extract the original group handle */
    MPI_Comm_group(MPI_COMM_WORLD, &orig_group);

    /* Divide tasks into two distinct groups based upon rank */
    if (rank < NPROCS/2) {
      MPI_Group_incl(orig_group, NPROCS/2, ranks1, &new_group);
      }
    else {
      MPI_Group_incl(orig_group, NPROCS/2, ranks2, &new_group);
      }

    /* Create new new communicator and then perform collective communications */
    MPI_Comm_create(MPI_COMM_WORLD, new_group, &new_comm);
    MPI_Allreduce(&sendbuf, &recvbuf, 1, MPI_INT, MPI_SUM, new_comm);

    MPI_Group_rank (new_group, &new_rank);
    printf("rank= %d newrank= %d recvbuf= %d\n",rank,new_rank,recvbuf);
    MPI_Finalize();
    return 0;
}

衍生类型

MPI提供API给程序员来创造新的MPI数据类型。一般有4种方法来创建派生类型。

  • Contiguous
  • Vector
  • Indexed
  • Struct 即使你要创建的数据不是连续的。经过派生类型的转化后,对于MPI可以看做是连续的。实际上,MPI通过TypeMap来保存新的数据类型。
    TypeMap = {(type0, disp0), (type1 , disp1), ... , (typen-1, dispn-1)}
    其中type表示这个块中的数据原始类型,disp表示其在内存中的偏移量。
    主要的API如下:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
MPI_Type_contiguous (count,oldtype,&newtype) 
//count个连续的oldtype构成新的type

MPI_Type_vector (count,blocklength,stride,oldtype,&newtype)
//count个block,每个block有blocklength个oldtype,每两个相邻的block直接隔着stride个oldtype。

MPI_Type_indexed (count,blocklens[],offsets[],old_type,&newtype)
//count个block,每个block的len对应blocklens[]中的值,每个block开始的offset(oldtype)对应于offsets[]中的值。
//这可以拿来构造三角矩阵。


MPI_Type_struct (count,blocklens[],offsets[],old_types,&newtype)
//count个block,每个block的len对应blocklens[]中的值,每个block开始的offset(bytes)对应于offsets[]中的值,每个块对应的oldtype来自oldtypes中的值
//较为常用的一种方法。

MPI_Type_commit (&datatype)
//在使用前先commit

MPI_Type_extent (datatype,&extent)
//返回datatype所占的字节数。
//一个工具类函数

MPI_Type_free (&datatype)
//你猜?

给一个构建struct的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include "mpi.h"
#include <stdio.h>
#define NELEM 25

main(int argc, char *argv[])  {
int numtasks, rank, source=0, dest, tag=1, i;

typedef struct {
  float x, y, z;
  float velocity;
  int  n, type;
  }          Particle;
Particle     p[NELEM], particles[NELEM];
MPI_Datatype particletype, oldtypes[2]; 
int          blockcounts[2];

/* MPI_Aint type used to be consistent with syntax of */
/* MPI_Type_extent routine */
MPI_Aint    offsets[2], extent;

MPI_Status stat;

MPI_Init(&argc,&argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &numtasks);

/* Setup description of the 4 MPI_FLOAT fields x, y, z, velocity */
offsets[0] = 0;
oldtypes[0] = MPI_FLOAT;
blockcounts[0] = 4;

/* Setup description of the 2 MPI_INT fields n, type */
/* Need to first figure offset by getting size of MPI_FLOAT */
MPI_Type_extent(MPI_FLOAT, &extent);
offsets[1] = 4 * extent;
oldtypes[1] = MPI_INT;
blockcounts[1] = 2;

/* Now define structured type and commit it */
MPI_Type_struct(2, blockcounts, offsets, oldtypes, &particletype);
MPI_Type_commit(&particletype);

/* Initialize the particle array and then send it to each task */
if (rank == 0) {
  for (i=0; i<NELEM; i++) {
     particles[i].x = i * 1.0;
     particles[i].y = i * -1.0;
     particles[i].z = i * 1.0; 
     particles[i].velocity = 0.25;
     particles[i].n = i;
     particles[i].type = i % 2; 
     }
  for (i=0; i<numtasks; i++) 
     MPI_Send(particles, NELEM, particletype, i, tag, MPI_COMM_WORLD);
  }

MPI_Recv(p, NELEM, particletype, source, tag, MPI_COMM_WORLD, &stat);

/* Print a sample of what was received */
printf("rank= %d   %3.2f %3.2f %3.2f %3.2f %d %d\n", rank,p[3].x,
     p[3].y,p[3].z,p[3].velocity,p[3].n,p[3].type);

MPI_Type_free(&particletype);
MPI_Finalize();
}

其他

最后再给个例子。也是很多MPI教程常见的利用积分计算PI的值:

$$\pi=\int_0^1{\frac{4}{1+x^2}dx}$$

PI

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <math.h>
#include "mpi.h"
#include <stdio.h>

double f(double a){
    return (4.0 / (1.0 + a * a));
}

int main(int argc, char* argv[]){
    int ProcRank, ProcNum, done = 0, n = 0, i;
    double PI25DT = 3.141592653589793238462643;
    double mypi, pi, h, sum, x, t1, t2;
    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &ProcNum);
    MPI_Comm_rank(MPI_COMM_WORLD, &ProcRank);
    while(!done){
        if (ProcRank == 0){
            printf("Enter the number of intervals: ");
            scanf("%d",&n);
            t1 = MPI_Wtime();
        }
        MPI_Bcast(&n, 1, MPI_INT, 0, MPI_COMM_WORLD);
        if(n > 0){
            h = 1.0/ n ;
            sum = 0.0;
            for(i = ProcRank + 1; i <= n; i += ProcNum){
                x = h * ((double)i -0.5);
                sum += f(x);
            }
            mypi = h*sum;
            MPI_Reduce(&mypi, &pi, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
            if(ProcRank == 0){
                t2 = MPI_Wtime();
                printf("pi is approximately %.16f, Error is %.16f\n", pi, fabs(pi-PI25DT));
                printf("wall clock time = %f\n", t2-t1);
            }

        }else done = 1;
    }
    MPI_Finalize();
    return 0;
}

2015年8月15日by septicmk。
若文中有误,请务必指出:)
注:
主要参考自这篇教程,很多概念出自其中。
还有这个ppt,也非常实用,密码: 5r2v。