本文是关于并行计算的学习,根据具体的实现方式不同,并行计算主要考虑两类方式:

  • 以 MPI 为代表的多进程,不共享内存,进程之间通过消息传递机制进行通信;
  • 以 OpenMP 为代表的多线程,共享内存;

本文主要针对的是 MPI,使用 C++ 接口。后续还可能有关于 OpenMP 和专门的 C++ 多线程库std::thread的学习。

多进程与多线程

对于系统而言,进程是资源分配的最小单位,直接点击可执行文件一般就会开启一个进程,执行当前的应用程序,分配独立的内存空间供其使用;线程是 CPU 调度的最小单位,线程必须隶属于进程而存在,一个进程默认拥有一个线程,也可以拥有多个线程,线程之间需要共享同一个内存空间。

对于多进程的直观理解,可以类比微信多开;对于多线程的理解,例如一个带 GUI 的科学计算器程序,至少拥有两个线程,其中一个线程处于等待状态,负责接收用户的界面操作信息,给出直观的反馈,另一个线程负责执行耗时的计算,防止计算任务对整个程序的运行造成阻塞。(如果只有一个线程,则执行耗时的计算过程中,图形界面就一直是卡死的状态)

多进程的编程中,关键是如何相互传递消息,因为默认情形下两个进程没有任何关系;多线程的编程中,关键是如何保护数据,因为多个线程在同一个内存空间中进行读写,难免相互影响。多线程中任一线程的崩溃会影响所有线程,而多进程中不同进程之间几乎不受任何影响。

MPI 简介

MPI 是一个跨语言的通讯协议,在并行计算中,MPI 提供了多进程之间传递消息的机制,支持进程之间点对点的通信,或者点对全体的广播通信。

MPI 只是一个协议/标准,要求提供 Fortran 和 C/C++语言的接口,标准只明确了各个函数接口的名称,参数以及功能,却不负责具体的底层实现,把实现留给了第三方负责。mpich,msmpi(微软),openmpi 等都是不同组织在不同平台上对 MPI 的具体实现。(msmspi 有很多 bug:Fortran 的接口可能有问题,CMake 也找不到这个库)我们使用 C++和 msmpi 完成接下来的内容。

MPI 的 HelloWorld

首先从 MPI 版本的 HelloWorld 开始,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include "mpi.h"
#include <iostream>

int main(int argc, char *argv[])
{
int rank, size, len;
char version[MPI_MAX_LIBRARY_VERSION_STRING];

MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Get_library_version(version, &len);

std::cout << "Hello, world! I am " << rank << " of " << size
<< "(" << version << ", " << len << ")" << std::endl;

MPI_Finalize();
return 0;
}

编译的时候需要包含相应的头文件目录,链接相应的库,直接使用find_package(MPI REQUIRED)压根找不到 msmpi。 我在 CMakeLists.txt 中,使用下面的命令获取了头文件目录和库的绝对路径:

1
2
3
4
5
set(MPI_LIBRARIES "$ENV{MSMPI_LIB64}msmpi.lib")
set(MPI_CXX_INCLUDE_DIRS "$ENV{MSMPI_INC}")

string(REPLACE "\\" "/" MPI_LIBRARIES ${MPI_LIBRARIES})
string(REPLACE "\\" "/" MPI_CXX_INCLUDE_DIRS ${MPI_CXX_INCLUDE_DIRS})

运行的时候需要特殊的命令,如果直接运行 exe 则相当于 MPI 没有发挥作用,必须使用如下命令运行编译后的a.exe

1
mpiexec -n 4 ./a.exe

上述命令中 mpiexec 对于别的 MPI 实现,可能会换成 mpirun,但是效果都是一样的,需要指定同时开启几个进程,上述命令开启了 4 个进程,运行结果如下:

1
2
3
4
Hello, world!  I am 0 of 4(Microsoft MPI 10.1.12498.18, 27)
Hello, world! I am 2 of 4(Microsoft MPI 10.1.12498.18, 27)
Hello, world! I am 1 of 4(Microsoft MPI 10.1.12498.18, 27)
Hello, world! I am 3 of 4(Microsoft MPI 10.1.12498.18, 27)

上述程序的运行发生了什么:我们同时在 4 个进程中运行 a.exe,

  • 在 main 函数中,它们首先通过MPI_Init完成 MPI 环境的初始化,
  • 在 main 函数的内部,它们各自
    • 通过MPI_Comm_size知道当前 MPI 环境里有多少个进程在同时运行,
    • 通过MPI_Comm_rank获取当前进程在整个 MPI 环境里的编号,
    • 通过MPI_Get_library_version获取当前 MPI 库的版本号,例如上述版本就是微软实现的 Microsoft MPI 10.1.12498.18,27 只是表明这个版本号的字符串长度。
  • 每个进程都向屏幕输出了"Hello, world!"字符串,以及自己进程的编号,可以发现,它们是乱序的,并且每一次运行得到的顺序也很可能不同。
  • 最后会在 main 函数退出前通过MPI_Finalize统一结束,如果缺少这条命令程序会报错或崩溃,需要避免程序在没有经过MPI_Finalize时突然结束。

HelloWorld 的随机乱序输出体现了多进程的特点:进程之前本就没有什么关系,有的运行的快,有的运行的慢,是混乱无序的。MPI 环境就是为了实现多个进程之间的协同,可以抽象地理解为,有一个领导者在调控多个进程,每个进程从它那里获取总的进程数,获取自己的编号,统一开始,统一结束。

MPI 消息传递

上面的例子只是给出了 MPI 编程必要的框架,却没有体现 MPI 的精华之处:消息传递,因此我们在它的基础上添加如下功能:

  • 在输出 helloworld 字符串的前后计时,获取时间差;
  • 所有进程把时间差的数据发送给编号为 0 的进程,由 0 进程汇总并输出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include "mpi.h"
#include <iostream>

int main(int argc, char *argv[])
{
int rank, size, len;
char version[MPI_MAX_LIBRARY_VERSION_STRING];

MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Get_library_version(version, &len);

double time_1 = MPI_Wtime();

std::cout << "Hello, world! I am " << rank << " of " << size
<< "(" << version << ", " << len << ")" << std::endl;

double time_2 = MPI_Wtime();
double dt = time_2 - time_1;

if (rank > 0) {
// 其他进程向0进程发送数据
MPI_Send(&dt, 1, MPI_DOUBLE, 0, rank, MPI_COMM_WORLD);
}

if (rank == 0) {
// 0进程收集其他进程的消息
auto p = new double[size]; // 创建长度为size的double数组
MPI_Status status; // 这是一个状态标识,用于记录是否成功接收到数据

p[0] = dt; // 记录自己的时间差
// 收集其他进程发送过来的时间差
for (int id = 1; id < size; id++) {
MPI_Recv(p + id, 1, MPI_DOUBLE, id, id, MPI_COMM_WORLD, &status);
}

// 输出所有进程的时间差
for (int id = 0; id < size; id++) {
std::cout << "time of (" << id << ") = " << *(p + id) << std::endl;
}

delete[] p; // 释放内存
}

MPI_Finalize();
return 0;
}

运行命令不变,同样开启 4 个进程,运行结果如下

1
2
3
4
5
6
7
8
Hello, world!  I am 1 of 4(Microsoft MPI 10.1.12498.18, 27)
Hello, world! I am 0 of 4(Microsoft MPI 10.1.12498.18, 27)
Hello, world! I am 2 of 4(Microsoft MPI 10.1.12498.18, 27)
Hello, world! I am 3 of 4(Microsoft MPI 10.1.12498.18, 27)
time of (0) = 4.56e-05
time of (1) = 8.19e-05
time of (2) = 4.68e-05
time of (3) = 7.24e-05

我们关注两个新功能的实现, 首先,我们使用 MPI 提供的计时函数MPI_Wtime,获取时间差 dt,

1
2
3
4
5
6
7
double time_1 = MPI_Wtime();

std::cout << "Hello, world! I am " << rank << " of " << size
<< "(" << version << ", " << len << ")" << std::endl;

double time_2 = MPI_Wtime();
double dt = time_2 - time_1;

然后对于编号不为 0 的进程,进入如下分支

1
2
3
4
if (rank > 0) {
// 其他进程向0进程发送数据
MPI_Send(&dt, 1, MPI_DOUBLE, 0, rank, MPI_COMM_WORLD);
}

这个语句调用了MPI_Send函数:把 dt 作为一个 double 类型的数据打包,从当前 rank 进程发送给 0 进程。(为了和 Fortran 保持一致,MPI 在 C/C++中对于参数也是主要使用传址调用的形式)

对于编号为 0 的进程,它会进入如下分支,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if (rank == 0) {
// 0进程收集其他进程的消息
auto p = new double[size]; // 创建长度为size的double数组
MPI_Status status; // 这是一个状态标识,用于记录是否成功接收到数据

p[0] = dt; // 记录自己的时间差
// 收集其他进程发送过来的时间差
for (int id = 1; id < size; id++) {
MPI_Recv(p + id, 1, MPI_DOUBLE, id, id, MPI_COMM_WORLD, &status);
}

// 输出所有进程的时间差
for (int id = 0; id < size; id++) {
std::cout << "time of (" << id << ") = " << *(p + id) << std::endl;
}

delete[] p; // 释放内存
}

这个部分调用了MPI_Recv函数:依次从编号为 1 到 size-1 的进程,接收一个 double 类型的数据,存储到 p 指针对应的动态内存中。

MPI 完备子集

MPI 标准提供了一系列的函数,变量,类型等,用于完成消息通信的目标功能,在命名规则上有一些惯例:

  • MPI 提供的所有名称都以MPI_开头,无论是函数,变量还是类型。
  • 函数名称的大小写一般为MPI_Abc_def的形式。

MPI 提供了一两百个MPI_开头的函数接口,但是我们并不需要这么多,并且它们大多数功能类似,只有细微的差别,我们只需要关注其中最核心的六个接口,由它们构成了 MPI 的完备子集。(我们只关注 C++的接口,Fortran 版本在参数上有很大的不同)

  • MPI_Init MPI 初始化
  • MPI_Finalize MPI 结束
  • MPI_Comm_rank 获取当前进程在 MPI 通信域中的编号
  • MPI_Comm_size 获取当前 MPI 通信域的进程数
  • MPI_Send 发送消息
  • MPI_Recv 接收消息

MPI_Init 和 MPI_Finalize

这两个其实没啥好说的,是每个 MPI 程序必要的两条语句,在 main 函数之后以及整个程序退出之前都需要使用,否则可能出现无法预见的错误。 MPI_Init 在 mian 函数之后立刻调用,以指针的形式接收 main 函数的参数。MPI_Finalize 在程序退出之前使用,无参数。

1
2
3
4
5
6
7
8
9
10
// 函数原型
int MPI_Init(const int *argc, char ***argv);
// argc和argv都是main函数的参数(输入参数)
// 这里需要以指针的形式传递给MPI_Init

int MPI_Finalize();

// 调用
MPI_Init(&argc, &argv);
MPI_Finalize();

MPI_Comm_rank 和 MPI_Comm_size

这两条语句是消息传递的基础:每个进程都至少需要知道自己是谁,自己可以向哪些进程发送消息。进程编号是从 0 开始的非负整数,在 MPI 环境下,每个进程会被分配到唯一的固定的一个编号,直到程序结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 函数原型
int MPI_Comm_rank(MPI_Comm comm, int *rank);
// 第一个参数comm MPI通信域(输入参数)
// 第二个参数rank 当前进程的编号,以整数指针的形式(输出参数)

int MPI_Comm_size(MPI_Comm comm, int *size);
// 第一个参数comm MPI通信域(输入参数)
// 第二个参数size 当前通信域包含的进程数(输出参数)

// 调用
// 我们首先要定义两个整数变量,名称可以任意,然后把它们的地址传递过去
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);

两个函数都涉及到通信域的概念,但是我们不用管它,直接使用默认的全局通信域MPI_COMM_WORLD作为第一个参数。

MPI_Send 和 MPI_Recv

发送消息的函数在实现发送消息的功能时,我们至少需要下列参数:

  • 收件人(接收消息的进程编号)
  • 消息本体(发送数据缓存区,指向一段连续内存的指针)
  • 消息的特点(数据的类型,数据的个数)
  • 消息的标识
  • 通信域

函数原型如下,

1
2
3
4
5
6
7
8
9
// 函数原型
int MPI_Send(const void *buf, int count, MPI_Datatype datatype,
int dest, int tag, MPI_Comm comm)
// (输入参数)buf 发送数据缓存区
// (输入参数)count 发送数据长度(不是字节数)
// (输入参数)datatype 发送数据类型,例如double对应为MPI_DOUBLE,见下文
// (输入参数)dest 接收进程编号
// (输入参数)tag 消息标识(任一整数即可,但注意接收时需要提供相同的消息标识)
// (输入参数)comm 通信域

函数的使用例如

1
2
3
4
5
6
7
MPI_Send(&dt, 1, MPI_DOUBLE, 0, rank, MPI_COMM_WORLD);
// buf=&dt 取dt所在的地址作为发送数据缓存区
// count=1 表示只有一个数据
// datatype=MPI_DOUBLE 表示数据是double类型
// dest=0 表示接收进程的编号为0
// tag=rank 表示以当前进程编号作为此次消息的标识数
// comm=MPI_COMM_WORLD 使用默认的全局通信域

表示在全局通信域下,把 dt 作为一个 double 类型的数据,传递给编号为 0 的进程,消息标识为当前进程的编号。

对于数据类型,需要使用 MPI 预先定义好的数据类型名称,例如

  • MPI_DOUBLE = double
  • MPI_FLOAT = float
  • MPI_INT = signed int
  • MPI_UNSIGNED = unsigned int
  • MPI_UNSIGNED_LONG = unsigned long int

与发送消息类似,接收消息也需要很多参数:

  • 发件人(发送消息的进程编号)
  • 消息本体(接收数据缓存区,指向一段连续内存的指针)
  • 消息的特点(数据的类型,数据的个数)
  • 消息的标识
  • 通信域
  • 接收状态(反馈当前进程是否顺利接收消息)

函数原型如下

1
2
3
4
5
6
7
8
9
10
// 函数原型
int MPI_Recv(void *buf, int count, MPI_Datatype datatype,
int source, int tag, MPI_Comm comm, MPI_Status *status)
// (输入输出参数)buf 接收数据缓存区
// (输入参数)count 接收数据长度(不是字节数)
// (输入参数)datatype 接收数据类型,例如double对应为MPI_DOUBLE
// (输入参数)source 发送进程编号
// (输入参数)tag 消息标识(与发送时需要提供相同的消息标识)
// (输入参数)comm 通信域
// (输出参数)status 接收的状态,可以通过读取这个变量确定是否顺利接收消息

函数的使用例如

1
2
3
4
5
6
7
8
9
10
11
MPI_Status status; // 先定义一个状态变量
for (int id = 1; id < size; id++) {
MPI_Recv(p + id, 1, MPI_DOUBLE, id, id, MPI_COMM_WORLD, &status);
// buf=p+id 把p[id]位置作为接收数据缓存区
// count=1 只接收一个数据
// datatype=MPI_DOUBLE 接收数据类型是double
// source=id 发送进程的编号为id
// tag=id 消息的标识为id
// comm=MPI_COMM_WORLD 使用默认的全局通信域
// status=&status 把接收状态记录到status中
}

不严谨地说,基于上述六个函数,我们就可以实现 MPI 主要的功能,而不需要去理会消息传递的细节,对于更加多样化的需求,也可以自行在这六个函数的基础上进行封装。但是 MPI 同时也提供给了我们一些便利的接口,用于实现更丰富的消息传递功能。

MPI 进阶

消息传递过程

在 MPI 消息传递的整个过程中,实际上存在 3 个步骤:

  • 消息装配:从发送缓存区取出数据,形成一个消息;
  • 消息传递:将消息从发送端传递给接收端;
  • 消息拆卸:把接收到的消息写入接收缓存区。

在发送和接收的过程中,MPI 要求数据类型需要完全匹配。

我们考虑从 A 进程向 B 进程先后发送消息 m1,m2 的情景,MPI 不会保证消息到达 B 的顺序,为了避免错误接收,需要使用消息标识来区分 m1 和 m2,到达的消息会等待 B 的接收语句。

通常,我们需要使用通信域+发送者/接收者+消息标识来确定一个消息,但是可以使用下面的通配符进行更自由的接收:

  • MPI_ANY_SOURCE 表示允许任何消息的来源,但是其他条件仍然需要满足;
  • MPI_ANY_TAG 表示允许任何消息标记,但是其他条件仍然需要满足。

对于信息的发送,必须要指定一个接收者,指定一个消息标识(如果不会产生歧义的话,可以使用同一个整数标记不同的消息),为了编程语句的一致性,还可以向一个不存在的虚拟进程MPI_PROC_NULL发送消息(实质上是编号为-1 的不存在的进程),相当于无效语句,不会执行发生动作。

异常处理

我们可以使用MPI_Initialized函数来确保已经成功进行初始化。

1
2
3
int MPI_Initialized(int *flag);
// (输出参数) flag
// 如果已经初始化,flag=true,否则为false

如果当前进程发生了异常,需要直接关闭所有进程退出,可以使用MPI_Abort函数执行错误退出,并且关闭处于一个通信域的其他进程。

1
2
3
int MPI_Abort(MPI_Comm comm, int errorcode);
// (输入参数) comm 通信域
// (输入参数) errorcode 错误码

通信顺序与死锁

在多个进程之间的通信中,可能因为顺序不当产生死锁,使得程序陷入无尽的等待。例如下图是两个进程之间的死锁,双方首先都在执行接收语句

除了死锁之外,还有一种不安全的模式,双方首先都在执行发送语句,如下图,此时消息到达目的进程后不会被立刻拆卸,而是留在了系统缓存区——这是不安全的,MPI 无法保证有充足的系统缓存区,以及消息在系统缓存区的安全性。

一种安全的模式是一方先发送后接收,另一方先接收后发送。

通信模式

MPI 提供了四种通信模式,前文涉及的都是标准通信模式,还有其他三种通信模型,见下表。

通信模式 发送 接收
标准通信模式 MPI_Send MPI_Recv
缓存通信模式 MPI_Bsend 同上
同步通信模式 MPI_Ssend 同上
就绪通信模式 MPI_Rsend 同上

MPI 在后台维持了一个独立的进程,并准备了一个缓存区,我们不妨称之为后台缓存区。 标准模式下的消息是否使用后台缓存区是由 MPI 自行决定的(通常取决于消息大小是否超过 MPI 预留的后台缓存区),程序员无法直接掌控,我们以进程 A 将要把消息 m 传递给进程 B 为例,在标准通信模式下:

  • 如果 MPI 决定把消息 m 缓存,那么 A 的发送函数把消息拷贝到后台缓存区之后可以立刻返回,由 MPI 后台负责消息的发送,进程 A 可以继续执行它的后续语句,而 B 在接收消息时从后台缓存区中读取消息 m,读取完成后 B 继续执行它的后续语句。(发送进程不依赖接收进程)
  • 如果 MPI 决定不缓存消息 m,那么 A 的发送函数需要等待 B 执行到接收函数时,开始发送,直到消息 m 被完整传递给 B 之后,B 的接收函数执行完成返回,A 的发送函数也执行完成返回,两者可以继续执行它们的后续语句。(发送进程依赖于接收进程)

如果对标准通信模式不满意,则可以使用其他三种模式。这方面没看懂参考教材,网上的教程也都是基于这本书的,因此比较混乱。