系列文章目录
- 初探MPI——MPI简介
- 初探MPI——(阻塞)点对点通信
- 初探MPI——(非阻塞)点对点通信
- 初探MPI——集体通信
文章目录
- 系列文章目录
- 前言
- 一、Sending & Receiving message
- 1.1 简介
- 1.2 发送消息
- 1.3 接收消息
- 1.4 MPI 发送和接收方法的定义
- 二、练习
- 2.1 一个简单的通信1
- 2.2 一个简单的通信2
- 2.3 乒乓程序
- 三、Dynamic Receiving with `MPI Probe` (and `MPI Status`)
- 3.1 `MPI_Status` 结构体查询的示例
- 3.2 使用 `MPI_Probe`找出消息大小
- 五、(阻塞)点对点通信应用——随机游走(random walk)
- 5.1 问题简述
- 5.2 使用 `MPI_Send` 和 `MPI_Recv` 组织代码
- 5.2.1 第一次尝试(有死锁风险)
- 5.2.2 死锁及预防
- 5.2.3 确定每个walker的结束时间
- 5.2.4 最终代码
- 总结
- Appendix
- 1. `std::vector::data()`
- 参考
前言
在上一篇文章中,我们已经了解了如何初始化 MPI、获取进程的等级和通信器的大小。但就目前而言,我们没有机会让流程进行通信,而这正是 MPI 的精髓所在。
有两种类型的通信,点对点(从现在开始我们将称之为 P2P)和集体通信。P2P通信分为两个操作:发送和接收。
P2P通信的最基本形式称为阻塞通信。发送消息的进程将等待,直到接收进程完成接收所有信息。这是最简单的沟通方式,但不一定是最快的。
一、Sending & Receiving message
1.1 简介
MPI 的发送和接收方法是按以下方式进行的:开始的时候,A 进程决定要发送一些消息给 B 进程。A进程就会把需要发送给B进程的所有数据打包好,放到一个缓存里面。因为所有数据会被打包到一个大的信息里面,因此缓存常常会被比作信封(就像我们把好多信纸打包到一个信封里面然后再寄去邮局)。数据打包进缓存之后,通信设备(通常是网络)就需要负责把信息传递到正确的地方。这个正确的地方也就是根据特定秩确定的那个进程。
尽管数据已经被送达到 B 了,但是进程 B 依然需要确认它想要接收 A 的数据。一旦它确定了这点,数据就被传输成功了。进程 A 会接收到数据传递成功的信息,然后去干其他事情。
有时候 A 需要传递很多不同的消息给 B。为了让 B 能比较方便地区分不同的消息,MPI 运行发送者和接受者额外地指定一些信息 ID (正式名称是标签, tags)。当 B 只要求接收某种特定标签的信息的时候,其他的不是这个标签的信息会先被缓存起来,等到 B 需要的时候才会给 B。
1.2 发送消息
A send operation, sends a buffer of data of a certain type to another process. A P2P message has the following properties :
-
A reference to a buffer 引用将始终是指向缓冲区的指针。此数组将保存希望从当前进程发送到另一个进程的数据。
-
A number of elements
The number of elements in the buffer that you want to send to the destination. -
A datatype 数据类型必须与缓冲区中存储的数据精确对应。为此,MPI 具有可以使用的预定义类型。最常见的类型与其 C 对应是:
还有许多其他的类型, 可以在 MPI standard documentation找到. 也可以自定义属于你自己程序所需要的类型,这里暂时不介绍。 -
A destination id
The rank of the process you want to send the data to. -
A tag 标签
在MPI中,tag参数在MPI_Send函数中用于标识消息的类型。它是一个整数,可以被发送方和接收方用来区分不同种类的消息。当你的程序在同一个通信通道(communicator)中发送和接收多种不同类型的数据时,tag就显得非常有用。 -
A communicator
要将数据发送到的通信器。请记住,进程的rank可能会根据选择的通信器而变化。
1.3 接收消息
消息的接收方式与发送操作完全相同。但是,调用将需要源 ID (source id)而不是目标 ID,正在等待消息的进程的标识。
1.4 MPI 发送和接收方法的定义
MPI_Send(
void* data,
int count,
MPI_Datatype datatype,
int destination,
int tag,
MPI_Comm communicator)
MPI_Recv(
void* data,
int count,
MPI_Datatype datatype,
int source,
int tag,
MPI_Comm communicator,
MPI_Status* status)
MPI_Recv 方法特有的最后一个参数提供了接受到的信息的状态。这里一般会写MPI_STATUS_IGNORE
二、练习
2.1 一个简单的通信1
Process #0 发出
Process #1 接受
#include <iostream>
#include <mpi.h>
int main(int argc, char** argv){
MPI_Init(&argc, &argv);
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
int num;
std::cin >> num;
if (world_rank == 0){
MPI_Send(
/* *data = */ &num,
/* count = */ 1,
/* datatype = */ MPI_INT,
/* destination = */ 1,
/* tag = */ 0,
/* communicator = */ MPI_COMM_WORLD);
std::cout << "Process " << world_rank << " send num " << num << std::endl;
}else if (world_rank == 1){
MPI_Recv(
/* *data = */ &num,
/* count = */ 1,
/* datatype = */ MPI_INT,
/* source = */ 0,
/* tag = */ 0,
/* communicator = */ MPI_COMM_WORLD,
/* status = */ MPI_STATUS_IGNORE);
std::cout << "Process " << world_rank << " receive num " << num << std::endl;
}
MPI_Finalize();
return 0;
}
2.2 一个简单的通信2
在两个进程之间进行实际通信。练习的目标如下:该程序将通过两个进程运行。程序将在命令行上获得两个随机整数,将其读入变量 local_value 。然后,根据进程的 ID,程序将具有不同的行为:
Process #0
- 发送获取的整数到Process #1
- 从Process #1中收到整数
- 打印出两个数的和
Process #1
- 从Process #0中收到整数
- 发送获取的整数到Process #0
- 打印出两个数的积
#include <iostream>
#include <mpi.h>
#include <cstdlib>
int main(int argc, char **argv) {
MPI_Init(&argc, &argv);
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
// Read the local value of the process
// local_value will hold a specific int for process 0, and another for process 1
int local_value;
local_value = atoi(argv[1]);
int other_value;
other_value = atoi(argv[2]);
if (rank == 0) {
// Here, enter the code for the first process :
// 1- Send the value to process 1
MPI_Send(&local_value, 1, MPI_INT, 1, 0, MPI_COMM_WORLD);
// 2- Receive the value from process 1 (in other_value)
MPI_Recv(&other_value, 1, MPI_INT, 1, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// 3- Print the sum of the two values on stdout
std::cout << local_value + other_value << std::endl;
}
else {
// Here enter the code for the second process :
// 1- Receive the value from process 0 (in other_value)
MPI_Recv(&local_value, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// 2- Send the value to process 0
MPI_Send(&other_value, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
// 3- Print the product of the two values on stdout
std::cout << other_value *local_value << std::endl;
}
MPI_Finalize();
return 0;
}
如果代码这样写:
if (rank == 0) {
MPI_Send(&local_value, 1, MPI_INT, 1, 0, MPI_COMM_WORLD);
MPI_Recv(&other_value, 1, MPI_INT, 1, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::cout << local_value + other_value << std::endl;
}
else {
MPI_Send(&other_value, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
MPI_Recv(&local_value, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::cout << other_value *local_value << std::endl;
}
从逻辑上来看,这段代码有很高的风险导致死锁。原因是进程0和进程1都在发送数据后立即尝试接收数据,如果发送和接收都是阻塞操作,并且没有足够的缓冲区来处理这些操作,那么这种情况可能会导致死锁。
- 进程0 执行以下操作:
- 向进程1发送local_value。
- 立即尝试从进程1接收other_value。
- 进程1 执行以下操作:
- 发送other_value到进程0。
- 立即尝试从进程0接收local_value。
如果进程0的发送操作(MPI_Send)和进程1的发送操作都是阻塞的,并且都在等待接收方开始接收操作(如在系统缓冲区不足以处理即时发送时发生),那么进程0和进程1都将无法继续前进。进程0在等待从进程1接收数据,而进程1也在等待从进程0接收数据,这就形成了一个循环等待,导致死锁。
“阻塞”通信是一个重要概念。阻塞通信指的是在通信操作(如发送或接收数据)完成之前,相关的进程会停止执行任何后续操作。也就是说,进程会在那里“阻塞”等待,直到通信操作完成为止。
阻塞通信的工作方式
当你使用MPI_Send函数进行阻塞发送时,以下几种情况可能会发生:
- 等待缓冲区可用:如果系统的发送缓冲区不足以容纳要发送的消息,那么发送操作会阻塞,直到有足够的空间存放这条消息为止。在这种情况下,即使消息还没有被对方接收,只要消息被复制到系统缓冲区中,MPI_Send就会返回,允许发送进程继续执行。
- 直接传递给接收者:在一些实现中,如果接收者已经发出了接收请求,并且准备好接收数据,发送操作可能会直接将数据传递给接收者,然后才返回。在这种情况下,MPI_Send会阻塞,直到数据被复制到接收者的接收缓冲区中。
- 同步发送:对于某些特定类型的发送操作(如MPI_Ssend,即同步发送),发送操作需要等待接收方开始接收过程,这确保了发送和接收的同步进行。这种发送方式在发送完成前会阻塞发送进程。
阻塞的影响
阻塞的主要影响是可能导致程序的效率降低,特别是当多个进程需要频繁通信时。更严重的是,如果通信的设计不当,可能会导致死锁,即两个或多个进程互相等待对方完成操作,从而无法继续执行。
例如,在一个简单的两进程系统中,如果进程A和进程B都首先尝试发送大量数据给对方,并等待对方的回应,而他们的发送操作都是阻塞的并且依赖于对方的接收操作开始,那么这两个进程都将无法继续向前执行,因为它们都在等待对方先行动,从而发生死锁。
2.3 乒乓程序
一个乒乓游戏。两个进程会一直使用 MPI_Send 和 MPI_Recv 方法来“推挡”消息,直到他们决定不玩了。
#include <mpi.h>
#include <iostream>
#include <cstdlib>
int main(int argc, char** argv){
const int PING_PONG_LIMIT = 10;
MPI_Init(&argc, &argv);
int my_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
int ping_pong_count = 0;
int partner_rank = (my_rank + 1) % 2;
while (ping_pong_count < PING_PONG_LIMIT){
if(my_rank == ping_pong_count % 2){
ping_pong_count++;
MPI_Send(&ping_pong_count, 1, MPI_INT, partner_rank, 0, MPI_COMM_WORLD);
std::cout << my_rank << " sent and incremented ping_pong_count " \
<< ping_pong_count << " to " << partner_rank << std::endl;
}else{
MPI_Recv(&ping_pong_count, 1, MPI_INT, partner_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::cout << my_rank << " received ping_pong_count " \
<< ping_pong_count << " from " << partner_rank << std::endl;
}
}
MPI_Finalize();
return 0;
}
运行结果:
0 sent and incremented ping_pong_count 1 to 1
0 received ping_pong_count 2 from 1
0 sent and incremented ping_pong_count 3 to 1
0 received ping_pong_count 4 from 1
0 sent and incremented ping_pong_count 5 to 1
0 received ping_pong_count 6 from 1
0 sent and incremented ping_pong_count 7 to 1
0 received ping_pong_count 8 from 1
0 sent and incremented ping_pong_count 9 to 1
0 received ping_pong_count 10 from 1
1 received ping_pong_count 1 from 0
1 sent and incremented ping_pong_count 2 to 0
1 received ping_pong_count 3 from 0
1 sent and incremented ping_pong_count 4 to 0
1 received ping_pong_count 5 from 0
1 sent and incremented ping_pong_count 6 to 0
1 received ping_pong_count 7 from 0
1 sent and incremented ping_pong_count 8 to 0
1 received ping_pong_count 9 from 0
1 sent and incremented ping_pong_count 10 to 0
这个程序在其他机器上运行的输出可能会由于进程调度的不同跟上面的不一样。不管怎么样,可以看到,进程0和进程1在轮流发送和接收 ping_pong_count。
三、Dynamic Receiving with MPI Probe
(and MPI Status
)
在上一节中,我在接收时忽略了 MPI_Status
的值。
MPI_Status
是一个 结构体,如有必要,可以访问该结构,以获取收到的消息的更多信息。
MPI_Status
该结构体在 OpenMPI 中定义如下:
struct MPI_Struct {
int MPI_SOURCE; //消息的来源
int MPI_TAG; //消息的标记
int MPI_ERROR; //接收消息期间是否发生错误
//最后两个属性不用该被使用
int _cancelled;
size_t _ucount;
};
例子:
double values[5];
MPI_Status status;
MPI_Recv(&values, 5, MPI_DOUBLE, MPI_ANY_SOURCE, MPI_ANY_TAG, &status);
std::cout << "Received from process " << status.MPI_SOURCE
<< "; with tag " << status.MPI_TAG << std::endl;
如上所述,MPI_Recv
将 MPI_Status
结构体的地址作为参数(可以使用 MPI_STATUS_IGNORE
忽略)。 如果我们将 MPI_Status
结构体传递给 MPI_Recv
函数,则操作完成后将在该结构体中填充有关接收操作的其他信息。 三个主要的信息包括:
-
发送端rank. 发送端的rank存储在结构体的
MPI_SOURCE
元素中。也就是说,如果我们声明一个MPI_Status stat
变量,则可以通过stat.MPI_SOURCE
访问rank。 -
消息的标签. 消息的标签可以通过结构体的
MPI_TAG
元素访问(类似于MPI_SOURCE
)。 -
消息的长度. 消息的长度在结构体中没有预定义的元素。相反,我们必须使用
MPI_Get_count
找出消息的长度。MPI_Get_count( MPI_Status* status, MPI_Datatype datatype, int* count)
在 MPI_Get_count
函数中,使用者需要传递 MPI_Status
结构体,消息的 datatype(数据类型)
,并返回 count
。 变量 count
是已接收的 datatype
元素的数目。
3.1 MPI_Status
结构体查询的示例
查询 MPI_Status
结构体的程序。 程序将随机数量的数字发送给接收端,然后接收端找出发送了多少个数字。
#include <mpi.h>
#include <iostream>
#include <cstdlib>
#include <ctime>
int main(int argc, char** argv){
MPI_Init(&argc, &argv);
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
const int MAX_NUMS = 100;
int nums[MAX_NUMS];
int num_amount;
if (world_rank == 0) {
// Pick a random amount of integers to send to process one
srand(time(0));
num_amount = (rand() / (float)RAND_MAX) * MAX_NUMS;
// Send the amount of integers to process one
MPI_Send(nums, num_amount, MPI_INT, 1, 0, MPI_COMM_WORLD);
std::cout << "0 sent " << num_amount << " numbers to 1" << std::endl;
}else if (world_rank == 1){
MPI_Status status;
// Receive at most MAX_NUMS from process zero
MPI_Recv(nums, MAX_NUMS, MPI_INT, 0, 0, MPI_COMM_WORLD, &status);
// After receiving the message, check the status to determine how many
// numbers were actually received
MPI_Get_count(&status, MPI_INT, &num_amount);
// Print off the amount of numbers, and also print additional information
// in the status object
std::cout << "1 received " << num_amount << " numbers from 0. Message source = " << status.MPI_SOURCE << ", tag = " << status.MPI_TAG << std::endl;
}
MPI_Barrier(MPI_COMM_WORLD);
MPI_Finalize();
}
进程 0 将最多 MAX_NUMS
个整数以随机数量发送到进程 1。 进程 1 然后调用 MPI_Recv
以获取总计 MAX_NUMS
个整数。 尽管进程 1 以 MAX_NUMS
作为 MPI_Recv
函数参数,但进程 1 将最多接收到此数量的数字。 在代码中,进程 1 使用 MPI_INT
作为数据类型的参数,调用 MPI_Get_count
,以找出实际接收了多少个整数。 除了打印出接收到的消息的大小外,进程 1 还通过访问 status 结构体的 MPI_SOURCE
和 MPI_TAG
元素来打印消息的来源和标签。
3.2 使用 MPI_Probe
找出消息大小
了解了 MPI_Status
的工作原理,现在我们可以使用它来发挥更高级的优势。 除了传递接收消息并简易地配备一个很大的缓冲区来为所有可能的大小的消息提供处理(就像上一个示例中所做的那样),可以使用 MPI_Probe
在实际接收消息之前查询消息大小。 函数原型看起来像这样:
MPI_Probe(
int source,
int tag,
MPI_Comm comm,
MPI_Status* status)
MPI_Probe
看起来与 MPI_Recv
非常相似。 实际上,可以将 MPI_Probe
视为 MPI_Recv
,除了不接收消息外,它们执行相同的功能。 与 MPI_Recv
类似,MPI_Probe
将阻塞具有匹配标签和发送端的消息。 当消息可用时,它将填充 status 结构体。 然后,用户可以使用 MPI_Recv
接收实际的消息。
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <ctime>
int main(int argc, char** argv){
MPI_Init(&argc, &argv);
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
int number_amount;
if (world_rank == 0) {
const int MAX_NUMBERS = 100;
int numbers[MAX_NUMBERS];
// Pick a random amont of integers to send to process one
srand(time(0));
number_amount = (rand() / (float)RAND_MAX) * MAX_NUMBERS;
// Send the amount of integers to process one
MPI_Send(numbers, number_amount, MPI_INT, 1, 0, MPI_COMM_WORLD);
printf("0 sent %d numbers to 1\n", number_amount);
} else if (world_rank == 1) {
MPI_Status status;
// Probe for an incoming message from process zero
MPI_Probe(0, 0, MPI_COMM_WORLD, &status);
// When probe returns, the status object has the size and other
// attributes of the incoming message. Get the size of the message.
MPI_Get_count(&status, MPI_INT, &number_amount);
// Allocate a buffer just big enough to hold the incoming numbers
int* number_buf = (int*)malloc(sizeof(int) * number_amount);
// Now receive the message with the allocated buffer
MPI_Recv(number_buf, number_amount, MPI_INT, 0, 0, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
printf("1 dynamically received %d numbers from 0.\n",
number_amount);
free(number_buf);
}
}
与上一个示例类似,进程 0 选择随机数量的数字发送给进程 1。 不同之处在于,进程 1 现在调用 MPI_Probe
,以找出进程 0 试图发送多少个元素(利用 MPI_Get_count
)。 然后,进程 1 分配适当大小的缓冲区并接收数字。
尽管这个例子很简单,但是
MPI_Probe
构成了许多动态 MPI 应用程序的基础。 例如,控制端/执行子程序在交换变量大小的消息时通常会大量使用MPI_Probe
。 作为练习,对MPI_Recv
进行包装,将MPI_Probe
用于可能编写的任何动态应用程序。 它将使代码看起来更美好。
五、(阻塞)点对点通信应用——随机游走(random walk)
5.1 问题简述
问题详情见
5.2 使用 MPI_Send
和 MPI_Recv
组织代码
- 明确每个进程在域中的部分。
- 每个进程初始化 N 个 walker,所有这些 walker 都从其局部域的第一个值开始。
- 每个 walker 都有两个相关的整数值:当前位置和剩余步数。
- Walkers 开始遍历该域,并传递到其他进程,直到完成所有移动。
- 当所有 walker 完成时,该进程终止。
分解域。 该函数将考虑域的总大小,并为 MPI 进程找到合适的子域。 它还会将域的其余部分交给最终的进程。 为了简单起见,调用 MPI_Abort 处理发现的任何错误。该函数返回一个子域开始和一个子域大小。
void decompose_domain(int domain_size, int world_rank,
int world_size, int* subdomain_start,
int* subdomain_size){
if (world_size > domain_size){
// 不要担心这种特殊情况,这里假定domain_size > world_size
MPI_Abort(MPI_COMM_WORLD, 1);
}
*subdomain_start = domain_size / world_size * world_rank;
*subdomain_size = domain_size / world_size;
if (world_rank == world_size - 1){
//因为world_rank是从0开始的
//所以当处理最后一个Process的时候将剩余的domain给它
*subdomain_size += domain_size % world_size;
}
}
定义 walker 结构
typedef struct {
int location;
int num_steps_left_in_walk;
} Walker;
初始化函数initialize_walkers
,它采用子域边界,并将 walker 添加到 incoming_walkers vector 中。
void initialize_walkers(int num_walkers_per_proc, int max_walk_size,
int subdomain_start, int subdomain_size,
vector<Walker>* incoming_walkers){
Walker walker;
for (int i = 0; i < num_walkers_per_proc; i++){
//初始化walker在当前进程中的属性值,位置是当前进程区域的开始位置,向左进行随机游走
walker.location = subdomain_start;
walker.num_steps_left_in_walk =
(rand() / (float)RAND_MAX) * max_walk_size;
incoming_walkers->push_back(walker);
}
}
使 walkers 前进。初始化之后,就该使 walkers 前进了。 让我们从一个移动功能开始。 此功能负责使 walkers 前进,直到完成移动为止。 如果超出局部域范围,则将其添加到 outgoing_walkers vector
void walk(Walker* walker, int subdomain_start, int subdomain_size,
int domain_size, vector<Walker>* outgoing_walkers) {
while (walker->num_steps_left_in_walk > 0) {
// 如果walker走到了当前进程的尽头,则将walker放入outgoing_walker中去
if (walker->location == subdomain_start + subdomain_size) {
// 如果walker走到了整个区域的尽头,则将walker的位置重置为整个区域的起始位置,实现环形边界条件
if (walker->location == domain_size) {
walker->location = 0;
}
outgoing_walkers->push_back(*walker);
break;
} else {
walker->num_steps_left_in_walk--;
walker->location++;
}
}
}
发送待传出的 walker 的函数和接收待传入的 walker 的函数。已经建立了初始化函数(用于填充传入的 walker 列表)和移动函数(用于填充传出的 walker 列表),我们仅再需要两个函数:发送待传出的 walker 的函数和接收待传入的 walker 的函数。 发送功能如下所示:
void send_outgoing_walkers(vector<Walker>* outgoing_walkers,
int world_rank, int world_size) {
// Send the data as an array of MPI_BYTEs to the next process.
// The last process sends to process zero.
MPI_Send((void*)outgoing_walkers->data(),
outgoing_walkers->size() * sizeof(Walker), MPI_BYTE,
(world_rank + 1) % world_size, 0, MPI_COMM_WORLD); //计算了要发送的数据的总字节数,MPI_BYTE表示一个字节
// Clear the outgoing walkers
outgoing_walkers->clear(); // 清空向量
}
使用MPI_Probe
,提前知道将要接受多少walkers,再用MPI_Recv
动态地接收walkers的消息
void receive_incoming_walkers(vector<Walker>* incoming_walkers,
int world_rank, int world_size) {
MPI_Status status;
// Receive from the process before you. If you are process zero,
// receive from the last process
int incoming_rank =
(world_rank == 0) ? world_size - 1 : world_rank - 1;
MPI_Probe(incoming_rank, 0, MPI_COMM_WORLD, &status);
// Resize your incoming walker buffer based on how much data is
// being received
int incoming_walkers_size;
MPI_Get_count(&status, MPI_BYTE, &incoming_walkers_size); // 获取接收到的消息的字节数
incoming_walkers->resize(
incoming_walkers_size / sizeof(Walker)); //计算walker的数量
MPI_Recv((void*)incoming_walkers->data(), incoming_walkers_size,
MPI_BYTE, incoming_rank, 0, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
}
现在已经建立了程序的主要功能。 必须将所有这些功能集成在一起,如下所示:
- 初始化 walkers.
- 使用 walk 函数使 walkers 前进。
- 发出 outgoing_walkers 向量中的所有的 walkers。
- 将新接收的 walkers 放入 incoming_walkers 向量中。
- 重复步骤 2 到 4,直到所有 walkers 完成。
这里的incoming_walker和outcoming_walker两个向量,更像是MPI发送和接受消息的缓冲区
incoming_walkers
向量用于存储从其他进程接收到的行走者数据。每个进程在接收到其他进程发送的行走者数据后,将这些数据存储在incoming_walkers
向量中,然后根据需要进行处理。outgoing_walkers
向量用于存储要发送给其他进程的行走者数据。每个进程在处理完当前域内的行走者数据后,将要发送的行走者数据存储在outgoing_walkers
向量中,然后将其发送给下一个进程。
5.2.1 第一次尝试(有死锁风险)
// 找到当前domain
decompose_domain(domain_size, world_rank, world_size,
&subdomain_start, &subdomain_size);
// 在当前domain里初始化walker
initialize_walkers(num_walkers_per_proc, max_walk_size,
subdomain_start, subdomain_size,
&incoming_walkers);
while (!all_walkers_finished) { // Determine walker completion later
// Process all incoming walkers
for (int i = 0; i < incoming_walkers.size(); i++) {
walk(&incoming_walkers[i], subdomain_start, subdomain_size,
domain_size, &outgoing_walkers);
}
// Send all outgoing walkers to the next process.
send_outgoing_walkers(&outgoing_walkers, world_rank,
world_size);
// Receive all the new incoming walkers
receive_incoming_walkers(&incoming_walkers, world_rank,
world_size);
}
一切看起来都很正常,但是函数调用的顺序引入了一种非常可能的情形 - 死锁。
5.2.2 死锁及预防
5.2.3 确定每个walker的结束时间
现在是程序的最后一步 - 确定每个 walker 何时结束。 由于 walkers 可以随机行走,因此它们可以在任何一个进程中结束它们的旅程。 因此,如果没有某种额外的通信,所有进程都很难知道 walkers 何时全部结束。 一种可能的解决方案是让进程零跟踪所有已完成的 walker,然后告诉其他所有进程何时终止。 但是,这样的解决方案非常麻烦,因为每个进程都必须向进程 0 报告所有完成的 walker,然后还要处理不同类型的传入消息。
在本节中,make it simple。 由于我们知道任意一个 walker 可以行进的最大距离和每对发送和接收对它可以行进的最小总大小(子域大小),因此可以计算出终止之前每个进程应该执行的发送和接收量。 在我们避免死锁的策略中考虑这一特征,该程序的最后主要部分如下所示:
// 找到当前domain
decompose_domain(domain_size, world_rank, world_size,
&subdomain_start, &subdomain_size);
// 在当前domain里初始化walker
initialize_walkers(num_walkers_per_proc, max_walk_size,
subdomain_start, subdomain_size,
&incoming_walkers);
// 计算出终止之前每个进程应该执行的发送和接收量
int maximum_sends_recvs =
max_walk_size / (domain_size / world_size) + 1;
for (int m = 0; m < maximum_sends_recvs; m++) {
// Process all incoming walkers
for (int i = 0; i < incoming_walkers.size(); i++) {
walk(&incoming_walkers[i], subdomain_start, subdomain_size,
domain_size, &outgoing_walkers);
}
// Send and receive if you are even and vice versa for odd
if (world_rank % 2 == 0) {
send_outgoing_walkers(&outgoing_walkers, world_rank,
world_size);
receive_incoming_walkers(&incoming_walkers, world_rank,
world_size);
} else {
receive_incoming_walkers(&incoming_walkers, world_rank,
world_size);
send_outgoing_walkers(&outgoing_walkers, world_rank,
world_size);
}
}
5.2.4 最终代码
#include <iostream>
#include <vector>
#include <cstdlib>
#include <time.h>
#include <mpi.h>
using namespace std;
typedef struct {
int location;
int num_steps_left_in_walk;
} Walker;
void decompose_domain(int domain_size, int world_rank,
int world_size, int* subdomain_start,
int* subdomain_size){
if (world_size > domain_size){
// 不要担心这种特殊情况,这里假定domain_size > world_size
MPI_Abort(MPI_COMM_WORLD, 1);
}
*subdomain_start = domain_size / world_size * world_rank;
*subdomain_size = domain_size / world_size;
if (world_rank == world_size - 1){
//因为world_rank是从0开始的
//所以当处理最后一个Process的时候将剩余的domain给它
*subdomain_size += domain_size % world_size;
}
}
void initialize_walkers(int num_walkers_per_proc, int max_walk_size,
int subdomain_start, int subdomain_size,
vector<Walker>* incoming_walkers){
Walker walker;
for (int i = 0; i < num_walkers_per_proc; i++){
//初始化walker在当前进程中的属性值,位置是当前进程区域的开始位置,向左进行随机游走
walker.location = subdomain_start;
walker.num_steps_left_in_walk =
(rand() / (float)RAND_MAX) * max_walk_size;
incoming_walkers->push_back(walker);
}
}
void walk(Walker* walker, int subdomain_start, int subdomain_size,
int domain_size, vector<Walker>* outgoing_walkers) {
while (walker->num_steps_left_in_walk > 0) {
// 如果walker走到了当前进程的尽头,则将walker放入outgoing_walker中去
if (walker->location == subdomain_start + subdomain_size) {
// 如果walker走到了整个区域的尽头,则将walker的位置重置为整个区域的起始位置,实现环形边界条件
if (walker->location == domain_size) {
walker->location = 0;
}
outgoing_walkers->push_back(*walker);
break;
} else {
walker->num_steps_left_in_walk--;
walker->location++;
}
}
}
void send_outgoing_walkers(vector<Walker>* outgoing_walkers,
int world_rank, int world_size) {
// Send the data as an array of MPI_BYTEs to the next process.
// The last process sends to process zero.
MPI_Send((void*)outgoing_walkers->data(),
outgoing_walkers->size() * sizeof(Walker), MPI_BYTE,
(world_rank + 1) % world_size, 0, MPI_COMM_WORLD);
// Clear the outgoing walkers list
outgoing_walkers->clear();
}
void receive_incoming_walkers(vector<Walker>* incoming_walkers,
int world_rank, int world_size) {
// Probe for new incoming walkers
MPI_Status status;
// Receive from the process before you. If you are process zero,
// receive from the last process
int incoming_rank =
(world_rank == 0) ? world_size - 1 : world_rank - 1;
MPI_Probe(incoming_rank, 0, MPI_COMM_WORLD, &status);
// Resize your incoming walker buffer based on how much data is
// being received
int incoming_walkers_size;
MPI_Get_count(&status, MPI_BYTE, &incoming_walkers_size);
incoming_walkers->resize(incoming_walkers_size / sizeof(Walker));
MPI_Recv((void*)incoming_walkers->data(), incoming_walkers_size,
MPI_BYTE, incoming_rank, 0, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
}
int main(int argc, char** argv) {
int domain_size;
int max_walk_size;
int num_walkers_per_proc;
// 在程序开始时检查命令行参数的数量是否足够
//用户应该提供三个参数:
//domain_size 表示域的大小,max_walk_size 表示行走者的最大步数,num_walkers_per_proc 表示每个进程的行走者数量
if (argc < 4) {
cerr << "Usage: random_walk domain_size max_walk_size "
<< "num_walkers_per_proc" << endl;
exit(1);
}
domain_size = atoi(argv[1]);
max_walk_size = atoi(argv[2]);
num_walkers_per_proc = atoi(argv[3]);
MPI_Init(NULL, NULL);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
// 利用当前系统时间和进程的排名来初始化随机数生成器的种子,以产生随机的结果,
//同时确保每个进程在不同时间点初始化的种子是不同的,从而使得每个进程产生的随机数序列也是不同
srand(time(NULL) * world_rank);
int subdomain_start, subdomain_size;
vector<Walker> incoming_walkers, outgoing_walkers;
// 找到当前进程subdomain
decompose_domain(domain_size, world_rank, world_size,
&subdomain_start, &subdomain_size);
// 在当前subdomain里初始化num_walkers_per_proc的数量的walker
initialize_walkers(num_walkers_per_proc, max_walk_size, subdomain_start,
&incoming_walkers);
cout << "Process " << world_rank << " initiated " << num_walkers_per_proc
<< " walkers in subdomain " << subdomain_start << " - "
<< subdomain_start + subdomain_size - 1 << endl;
// 计算出终止之前每个进程应该执行的发送和接收量
int maximum_sends_recvs = max_walk_size / (domain_size / world_size) + 1;
for (int m = 0; m < maximum_sends_recvs; m++) {
//当前进程中,所有walker都被塞入到了incoming_walker中
// Process all incoming walkers
for (int i = 0; i < incoming_walkers.size(); i++) {
walk(&incoming_walkers[i], subdomain_start, subdomain_size,
domain_size, &outgoing_walkers);
}
cout << "Process " << world_rank << " sending " << outgoing_walkers.size()
<< " outgoing walkers to process " << (world_rank + 1) % world_size
<< endl;
if (world_rank % 2 == 0) {
// 将所有在outgoing_walkers里面的walker都送给下一个进程
send_outgoing_walkers(&outgoing_walkers, world_rank,
world_size);
// Receive all the new incoming walkers
receive_incoming_walkers(&incoming_walkers, world_rank,
world_size);
} else {
// Receive all the new incoming walkers
receive_incoming_walkers(&incoming_walkers, world_rank,
world_size);
// Send all outgoing walkers to the next process.
send_outgoing_walkers(&outgoing_walkers, world_rank,
world_size);
}
cout << "Process " << world_rank << " received " << incoming_walkers.size()
<< " incoming walkers" << endl;
}
cout << "Process " << world_rank << " done" << endl;
MPI_Finalize();
return 0;
}
稍微写了一下代码思路:
编译命令及运行结果:
(base) ustcxp:~/cpp/parallel_program/MPI$ mpirun -np 4 ./rw 100 500 10
Process 1 initiated 10 walkers in subdomain 25 - 49
Process 1 sending 9 outgoing walkers to process 2
Process 2 initiated 10 walkers in subdomain 50 - 74
Process 2 sending 10 outgoing walkers to process 3
Process 0 initiated 10 walkers in subdomain 0 - 24
Process 0 sending 10 outgoing walkers to process 1
Process 2 received 9 incoming walkers
Process 2 sending 9 outgoing walkers to process 3
Process 1 received 10 incoming walkers
Process 1 sending 10 outgoing walkers to process 2
Process 3 initiated 10 walkers in subdomain 75 - 99
Process 3 sending 10 outgoing walkers to process 0
Process 3 received 10 incoming walkers
Process 3 sending 10 outgoing walkers to process 0
Process 2 received 10 incoming walkers
Process 2 sending 10 outgoing walkers to process 3
Process 2 received 9 incoming walkers
Process 2 sending 9 outgoing walkers to process 3
Process 1 received 9 incoming walkers
Process 1 sending 9 outgoing walkers to process 2
Process 1 received 10 incoming walkers
Process 1 sending 10 outgoing walkers to process 2
Process 3 received 9 incoming walkers
Process 3 sending 8 outgoing walkers to process 0
Process 3 received 10 incoming walkers
Process 3 sending 9 outgoing walkers to process 0
Process 3 received 9 incoming walkers
Process 3 sending 9 outgoing walkers to process 0
Process 3 received 9 incoming walkers
Process 0 received 10 incoming walkers
Process 0 sending 9 outgoing walkers to process 1
Process 0 received 10 incoming walkers
Process 0 sending 10 outgoing walkers to process 1
Process 0 received 8 incoming walkers
Process 0 sending 8 outgoing walkers to process 1
Process 0 received 9 incoming walkers
Process 0 sending 9 outgoing walkers to process 1
Process 0 received 9 incoming walkers
Process 0 sending 7 outgoing walkers to process 1
Process 0 received 7 incoming walkers
Process 0 sending 7 outgoing walkers to process 1
Process 3 sending 7 outgoing walkers to process 0
Process 3 received 6 incoming walkers
Process 3 sending 5 outgoing walkers to process 0
Process 1 received 8 incoming walkers
Process 1 sending 7 outgoing walkers to process 2
Process 1 received 9 incoming walkers
Process 1 sending 8 outgoing walkers to process 2
Process 1 received 7 incoming walkers
Process 1 sending 7 outgoing walkers to process 2
Process 2 received 10 incoming walkers
Process 2 sending 9 outgoing walkers to process 3
Process 2 received 7 incoming walkers
Process 2 sending 6 outgoing walkers to process 3
Process 2 received 8 incoming walkers
Process 2 sending 7 outgoing walkers to process 3
Process 2 received 7 incoming walkers
Process 2 sending 7 outgoing walkers to process 3
Process 0 received 5 incoming walkers
Process 0 sending 5 outgoing walkers to process 1
Process 0 received 6 incoming walkers
Process 0 sending 6 outgoing walkers to process 1
Process 0 received 7 incoming walkers
Process 0 sending 7 outgoing walkers to process 1
Process 0 received 6 incoming walkers
Process 0 sending 4 outgoing walkers to process 1
Process 0 received 5 incoming walkers
Process 0 sending 5 outgoing walkers to process 1
Process 0 received 5 incoming walkers
Process 0 sending 5 outgoing walkers to process 1
Process 3 received 7 incoming walkers
Process 3 sending 6 outgoing walkers to process 0
Process 3 received 7 incoming walkers
Process 3 sending 7 outgoing walkers to process 0
Process 3 received 6 incoming walkers
Process 3 sending 6 outgoing walkers to process 0
Process 3 received 5 incoming walkers
Process 3 sending 5 outgoing walkers to process 0
Process 3 received 6 incoming walkers
Process 3 sending 5 outgoing walkers to process 0
Process 3 received 5 incoming walkers
Process 3 sending 5 outgoing walkers to process 0
Process 3 received 4 incoming walkers
Process 1 received 7 incoming walkers
Process 1 sending 7 outgoing walkers to process 2
Process 1 received 5 incoming walkers
Process 1 sending 5 outgoing walkers to process 2
Process 1 received 6 incoming walkers
Process 1 sending 6 outgoing walkers to process 2
Process 1 received 7 incoming walkers
Process 1 sending 6 outgoing walkers to process 2
Process 1 received 4 incoming walkers
Process 1 sending 4 outgoing walkers to process 2
Process 1 received 5 incoming walkers
Process 1 sending 4 outgoing walkers to process 2
Process 2 received 7 incoming walkers
Process 2 sending 6 outgoing walkers to process 3
Process 2 received 5 incoming walkers
Process 2 sending 5 outgoing walkers to process 3
Process 2 received 6 incoming walkers
Process 2 sending 6 outgoing walkers to process 3
Process 2 received 6 incoming walkers
Process 2 sending 5 outgoing walkers to process 3
Process 2 received 4 incoming walkers
Process 2 sending 4 outgoing walkers to process 3
Process 0 received 5 incoming walkers
Process 0 sending 3 outgoing walkers to process 1
Process 0 received 4 incoming walkers
Process 0 sending 3 outgoing walkers to process 1
Process 0 received 4 incoming walkers
Process 0 sending 2 outgoing walkers to process 1
Process 3 sending 4 outgoing walkers to process 0
Process 3 received 4 incoming walkers
Process 3 sending 4 outgoing walkers to process 0
Process 3 received 5 incoming walkers
Process 3 sending 2 outgoing walkers to process 0
Process 2 received 4 incoming walkers
Process 2 sending 4 outgoing walkers to process 3
Process 2 received 5 incoming walkers
Process 2 sending 5 outgoing walkers to process 3
Process 2 received 2 incoming walkers
Process 2 sending 1 outgoing walkers to process 3
Process 2 received 3 incoming walkers
Process 2 sending 2 outgoing walkers to process 3
Process 2 received 2 incoming walkers
Process 2 sending 1 outgoing walkers to process 3
Process 2 received 1 incoming walkers
Process 2 sending 0 outgoing walkers to process 3
Process 2 received 0 incoming walkers
Process 2 sending 0 outgoing walkers to process 3
Process 2 received 0 incoming walkers
Process 2 sending 0 outgoing walkers to process 3
Process 2 received 0 incoming walkers
Process 2 done
Process 1 received 5 incoming walkers
Process 1 sending 5 outgoing walkers to process 2
Process 1 received 3 incoming walkers
Process 1 sending 2 outgoing walkers to process 2
Process 1 received 3 incoming walkers
Process 1 sending 3 outgoing walkers to process 2
Process 1 received 2 incoming walkers
Process 1 sending 2 outgoing walkers to process 2
Process 1 received 1 incoming walkers
Process 1 sending 1 outgoing walkers to process 2
Process 1 received 1 incoming walkers
Process 1 sending 0 outgoing walkers to process 2
Process 1 received 2 incoming walkers
Process 1 sending 0 outgoing walkers to process 2
Process 1 received 0 incoming walkers
Process 1 sending 0 outgoing walkers to process 2
Process 1 received 0 incoming walkers
Process 1 done
Process 3 received 1 incoming walkers
Process 3 sending 1 outgoing walkers to process 0
Process 3 received 2 incoming walkers
Process 3 sending 2 outgoing walkers to process 0
Process 3 received 1 incoming walkers
Process 3 sending 0 outgoing walkers to process 0
Process 3 received 0 incoming walkers
Process 3 sending 0 outgoing walkers to process 0
Process 3 received 0 incoming walkers
Process 3 sending 0 outgoing walkers to process 0
Process 3 received 0 incoming walkers
Process 3 done
Process 0 received 2 incoming walkers
Process 0 sending 1 outgoing walkers to process 1
Process 0 received 1 incoming walkers
Process 0 sending 1 outgoing walkers to process 1
Process 0 received 2 incoming walkers
Process 0 sending 2 outgoing walkers to process 1
Process 0 received 0 incoming walkers
Process 0 sending 0 outgoing walkers to process 1
Process 0 received 0 incoming walkers
Process 0 sending 0 outgoing walkers to process 1
Process 0 received 0 incoming walkers
Process 0 done
总结
这里主要介绍了阻塞点对点通信。
MPI_Send()
MPI_Recv()
MPI_Status
结构体对于动态获取消息长度有着关键作用(无法直接得到消息长度)MPI_Probe
操作MPI_Status
结构体发送长度。通常在MPI_Recv()
函数前面
Appendix
1. std::vector::data()
std::vector::data()
是 C++ 中的 STL,它返回一个指向内存数组的直接指针,该内存数组由向量内部用于存储其拥有的元素。
vector_name.data()
参数:该函数不接受任何参数。
返回值:该函数返回一个指向数组中第一个元素的指针,该指针在向量内部使用。
// C++ program to demonstrate the
// vector::data() function
#include <bits/stdc++.h>
using namespace std;
int main()
{
// initialising vector
vector<int> vec = { 10, 20, 30, 40, 50 };
// memory pointer pointing to the
// first element
int* pos = vec.data();
// prints the vector
cout << "The vector elements are: ";
for (int i = 0; i < vec.size(); ++i)
cout << *pos++ << " ";
return 0;
}
参考
- Introduction to MPI
- MPI Tutorials
- C+±数组-vector:.data()函数