文章目录
- 并行计算
- MPI(进程级并行)
- 基本结构
- 数据类型
- 点对点通信
- 阻塞
- 非阻塞
- 非连续数据打包
- 聚合通信
- Communicator & Cartisen Grid
- OpenMP(线程级并行)
- 简介
- 基本制导语句
- worksharing construct
- Sections
- Single
- For
- 临界区 & 原子操作
- Task
并行计算
并行类型:
- 进程级并行:网络连接,内存不共享
- 线程级并行:共享内存,同构 vs 异构
- 线程内并行:指令级并行(流水线、多发射),向量化(SIMD,AVX)
基本方法:
- 分解。数据划分、任务定义
- 协调。通信、同步、任务调度
基本原则:
- 平衡。处理器之间负载平衡、众核之间负载平衡
- 压榨。尽量使得各个部件同时运行
任务划分,
- 按数据划分:按输入数据(列主序)、按输出数据(行主序)、按输入输出数据(棋盘式)、按中间数据(矩阵乘,列行外积)
- 递归划分:归并排序、快速排序
- 探索式划分:对状态空间搜索,每个线程一种搜索策略,并不知道下一步会有多少任务
- 猜测式划分:并行处理不同时间的事件,如果时间来到某个事件改变了状态,那么就回滚
- 混合划分
并行模式,
- 数据并行模式:矩阵计算、数据处理等
- 任务图模式:使用任务依赖图
- 主从模式:主线程产生任务,分配给各个工作线程
- 流水线 / 生产者-消费者模式:每个处理部件完成一个阶段的任务,多个部件同时处理不同数据的不同阶段(GPU stream)
- 混合模式
MPI(进程级并行)
Message Passing Interface(MPI):一种基于信息传递的并行编程技术,定义了一组具有可移植性的编程接口标准(并非一种语言或者接口)。支持点对点通信和广播。MPI 的目标是高性能、大规模性、可移植性,在今天仍为高性能计算的主要模型。OpenMPI 函数库,微软 MPI 文档
基本结构
程序结构(C语言版)
MPI_Init(&argc, &argv); //初始化MPI
MPI_Comm_size(MPI_COMM_WORLD, &nprocs); //设置进程数
MPI_Comm_rank(MPI_COMM_WORLD, &myrank); //获取进程ID
... ...
MPI_Finalize(); //结束MPI
编译:
mpicc -o test ./test.c
mpicxx -o hello++ hello++.cxx
mpif90 -o pi3f90 pi3f90.f90
运行:
mpirun –np 4 –host HOST1,HOST2,HOST3,HOST4
mpirun -np 4 -hostfile hosts ./test
数据类型
MPI 数据类型 | C语言数据类型 |
---|---|
MPI_INT | int |
MPI_FLOAT | float |
MPI_DOUBLE | double |
MPI_SHORT | short |
MPI_LONG | long |
MPI_CHAR | char |
MPI_UNSIGNED_CHAR | unsigned char |
MPI_UNSIGNED_SHORT | unsigned short |
MPI_UNSIGNED | unsigned |
MPI_UNSIGNED_LONG | unsigned long |
MPI_LONG_DOUBLE | long double |
MPI_BYTE | unsigned char |
MPI_PACKED | 无 |
点对点通信
阻塞
阻塞式消息发送
int MPI_Send(const void *buf, int count, MPI_Datatype datatype,
int dest, int tag, MPI_Comm comm);
buf
:发送缓冲区的首地址count
:需要发送的数据项个数datatype
:每个被发送元素的数据类型dest
:目标进程的进程号(rank)tag
:消息标识(接收端要使用同样的标号,否则无法传递)comm
:通信域(Communicator,指出哪些进程参与通信)- 返回值:函数成功时返回
MPI_SUCCESS
,否则返回错误代码
阻塞式消息接收
int MPI_Recv(void *buf, int count, MPI_Datatype datatype,
int source, int tag, MPI_Comm comm, MPI_Status *status);
buf
:接收缓冲区的首地址count
:接收缓冲区最多存放多少个数据项datatype
:每个被接收元素的数据类型source
:发送进程的进程号(若设为MPI_ANY_SOURCE
,则可以传递任意进程的消息)tag
:消息标识(若设为MPI_ANY_TAG
,则可以传递任意标号的消息)comm
:通信域status
:函数返回时,存放发送方的进程号、消息 tag 等status->MPI_source
status->MPI_tag
status->MPI_error
- 返回值:函数成功时返回
MPI_SUCCESS
,否则返回错误代码
同时发送与接收
如果两个进程先执行 MPI_Send
后执行 MPI_Recv
,那么有可能出现死锁。
int MPI_Sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype, int dest, int sendtag,
void *recvbuf, int recvcount, MPI_Datatype recvtype, int source, int recvtag,
MPI_Comm comm, MPI_Status *status);
int MPIAPI MPI_Sendrecv_replace(void *buf, int count, MPI_Datatype datatype,
int dest, int sendtag, int source, int recvtag,
MPI_Comm comm, MPI_Status *status);
前者分别设置发送缓冲区、接收缓冲区,后者的发送和接收缓冲区是同一个。
非阻塞
非阻塞消息发送
int MPI_Isend(void *buf, int count, MPI_Datatype datatype,
int dest, int tag, MPI_Comm comm, MPI_Request *request);
- 大部分参数与
MPI_Send
一样 request
:未完成的 MPI (发送)请求的句柄
非阻塞消息接收
int MPI_Irecv(void *buf, int count, MPI_Datatype datatype,
int source, int tag, MPI_Comm comm, MPI_Request *request);
- 大部分参数与
MPI_Recv
一样 request
:未完成的 MPI (接收)请求的句柄
等待完成(阻塞)
int MPI_Wait(MPI_Request *request, MPI_Status *status);
request
:未完成的 MPI (发送 / 接收)请求的句柄status
:函数返回时,存放发送方的进程号、消息 tag 等
检验完成(非阻塞)
int MPI_Test(MPI_Request *request, int *flag, MPI_Status *status);
request
:未完成的 MPI (发送 / 接收)请求的句柄flag
:函数返回时, 非零值指示请求已完成status
:函数返回时,存放发送方的进程号、消息 tag 等
非连续数据打包
数据打包
int MPI_Pack(void *inbuf, int incount, MPI_Datatype datatype,
void *outbuf, int outsize, int *position, MPI_Comm comm);
inbuf
:输入缓冲区incount
:输入数据项个数datatype
:输入数据项的类型outbuf
:输出缓冲区outsize
:输出缓冲区字节长度position
:缓冲区当前字节位置;函数返回时,存放缓冲区新的位置comm
:通信域
数据解包
int MPI_Unpack(void *inbuf, int insize, int *position,
void *outbuf, int outcount, MPI_Datatype datatype, MPI_Comm comm);
inbuf
:输入缓冲区insize
:输入缓冲区字节长度position
:缓冲区当前字节位置;函数返回时,存放缓冲区新的位置outbuf
:输出缓冲区outcount
:输出数据项个数datatype
:输出数据项的类型comm
:通信域
聚合通信
为了更高效地完成多个进程间的消息传递,可以使用组通信。
基本组播
int MPI_Bcast(void *buffer, int count, MPI_Datatype datatype, int root, MPI_Comm comm);
分散
int MPI_Scatter(void *sendbuf, int sendcnt, MPI_Datatype sendtype,
void *recvbuf, int recvcnt, MPI_Datatype recvtype, int root, MPI_Comm comm);
归约
int MPI_Reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype,
MPI_Op op, int root, MPI_Comm comm);
全部归约
int MPI_Allreduce (void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype,
MPI_Op op, MPI_Comm comm);
聚集
int MPI_Gather (void *sendbuf, int sendcnt, MPI_Datatype sendtype,
void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm);
全部聚集
int MPI_Allgather (void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm);
全到全
int MPI_Alltoall(void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int recvcnt, MPI_Datatype recvtype, MPI_Comm comm);
Communicator & Cartisen Grid
将若干结点自动排布成多维网格 dims
:
int MPI_Dims_create(int nnodes, int ndims, int dims[]);
创建一个笛卡尔拓扑结构的通信域 comm_cart
:
int MPI_Cart_create(MPI_Comm comm_old, int ndims, const int dims[], const int periods[], int reorder, MPI_Comm *comm_cart);
得到当前进程的笛卡尔坐标 coords
:
int MPI_Cart_coords(MPI_Comm comm, int rank, int maxdims, int coords[]);
得到当前笛卡尔空间通信域的信息 dims, periods, coords
:
int MPI_Cart_get(MPI_Comm comm, int maxdims, int dims[], int periods[], int coords[]);
得到某个笛卡尔坐标上进程的进程号 rank
:
int MPI_Cart_rank(MPI_Comm comm, const int coords[], int *rank);
将一个通信域按照颜色 color
分裂成多个通信域 newcomm
,自己在新通信域中的进程号是 key
:
int MPI_Comm_split(MPI_Comm comm, int color, int key, MPI_Comm *newcomm);
用途:通用矩阵乘法(General Matrix Multiply, GEMM)的多进程并行。特殊的矩阵向量乘法可以:按列分割、按行分割、棋盘式分割。
OpenMP(线程级并行)
OpenMP 是用于共享内存并行系统的多处理器程序设计的一套指导性编译处理方案。程序员通过在源代码中加入专用的 #pragma
来指明自己的意图,由此编译器可以自动将程序进行并行化,并在必要之处加入同步互斥以及通信。OpenMP 官网
简介
OpenMP 的运行模式为:Fork and Join
支持 C/C++,其编译指令为
gcc -fopenmp somepcode.c -o somepcode_openmp
OpenMP基本制导语句:
- Omp sections
- Omp parallel
- Omp parallel for
- Omp critical
- ……
运行时函数:
- omp_get_thread_num
- omp_set_thread_num
- omp_get_num_threads
环境变量:
- OMP_NUM_THREADS
基本制导语句
#pragma omp parallel [clause[ [,] clause] ... ]
{
//SPMD
}
clause 有如下的常用选择:
if([parallel :] scalar-expression)
:表达式为真,才会作用于此段代码;if 作用于 parallel 语句num_threads(integer-expression)
:指定线程数default(shared | none)
:缺省情况下数据都是 shared,设为 none 表示必须指定数据是否 sharedprivate(list)
:私有数据列表firstprivate(list)
:私有数据继承前面的值shared(list)
:共享数据列表copyin(list)
reduction([reduction-modifier ,] reduction-identifier : list)
:对此数据做规约操作proc_bind(master | close | spread)
:与 CPU 核的绑定方式allocate([allocator :] list)
:选择 allocator
worksharing construct
共享工作结构:被一组 threads 恰好执行一次。For C/C++, worksharing constructs are for, sections, and single.
Sections
#pragma omp sections
{
#pragma omp section
//one calculation
...
#pragma omp section
//another calculation
}
Single
#pragma omp single
{
...
}
For
#pragma omp for [clause[ [,] clause] ... ]
for-loops{
...
}
clause 有如下的常用选择:
private(list)
:私有变量列表firstprivate(list)
:私有数据继承前面的值lastprivate([lastprivate-modifier:] list)
:私有数据,最后一次迭代结果写回原变量linear(list[: linear-step])
reduction([reduction-modifier,]reduction-identifier : list)
:归约schedule([modifier [, modifier]:]kind[, chunk_size])
:调度方式collapse(n)
:合并 n 层循环后,再分配任务给各线程(注意,合并前后的parivate(list)
不一定相同)ordered[(n)]
nowait
:循环末尾不用等待其他线程(否则,默认在每轮循环末尾加 barrier 同步各线程)allocate([allocator :]list)
order(concurrent)
这条 #pragma omp for
标记将会在编译时,派生N个线程,每个线程有自己的上下文(私有数据、共享数据)
- 循环变量是私有数据
- 其他数据缺省均为共享(必要时手动设置
private(list)
)
一般地,可以将 parallel 与 for 合用
//分开写
void simple(int n, float *a, float *b) {
int i;
#pragma omp parallel num_threads(5)
{
#pragma omp for nowait
for (i = 1; i < n; i++) /* i is private by default */
b[i] = (a[i] + a[i – 1]) / 2.0;
}
}
//合用
void simple(int n, float *a, float *b) {
int i;
#pragma omp parallel for num_threads(5) nowait private(i)
for (i = 1; i < n; i++) /* i is private by default */
b[i] = (a[i] + a[i – 1]) / 2.0;
}
schedule()
可以有 static, dynamic, guided
等多种调度方式,
临界区 & 原子操作
Critical:临界区内的操作是原子的,无竞争
int i,j;
int b[N][M];
int x = 0;
#pragma omp parallel for private(j)
{
for(i = 0; i < N; i++){
int m = i * i;
for(j = 0; j < M, j++)
b[i][j] = m*j;
#pragma omp critical
{
x += b[i][j]; //临界区内,至多只有一个线程
}
}
}
Reduce:归约(类似 MPI),性能比 Critical 好
int i,j;
int b[N][M];
int x = 0;
#pragma omp parallel for private(j) reduction(+: x)
{
for(i = 0; i < N; i++){
int m = i * i;
for(j = 0; j < M, j++)
x += m*j;
}
}
Atomic:原子操作
#pragma omp atomic [clause[[[,] clause] ... ] [,]] atomic-clause [[,] clause [[[,] clause] ... ]]
expression-stmt
Atomic-clause 可以是 read, write, update, capture
Atomic 仅作用于制导语句下面的语句。与临界区相比,不要求仅有一个线程进入,只需保证所修饰语句操作的原子性。例如:
int x[n];
int i;
#pragma omp parallel for shared(x, y, index, n)
for (i=0; i<n; i++) {
#pragma omp atomic update
x[index[i]] += work1(i); //多个threads可以同时访问数组x[n]
y[i] += work2(i);
}
Task
使用 task 制导语句,定义任务,实现更加灵活的并行方式
- task 内可以再派生 task
- 可以自定义 task 的调度方式、优先级、任务间依赖关系
- Included task / undeterred task:串行化的任务
- 可以对任务加更多限制:mergeable,untied,final
int fib(int n){
int i, j;
if (n<2)
return n;
else{
#pragma omp task shared(i)
i=fib(n-1);
#pragma omp task shared(j)
j=fib(n-2);
#pragma omp taskwait
return i+j;
}
}
int main(){
#pragma omp parallel
#pragma omp master
fib(n); //只能由一个线程生成task,任务分给各个线程执行
}
Taskgroup:产生一组任务,在结构结尾加上 barrier 同步所有任务
#pragma omp taskgroup [clause[[,] clause] ...]
structured-block
clause 有如下的常用选择:
task_reduction(reduction-identifier : list)
:任务归约allocate([allocator :] list)
//递归生成task
void compute_tree(tree_type tree)
{
if (tree->left){
#pragma omp task
compute_tree(tree->left);
}
if (tree->right){
#pragma omp task
compute_tree(tree->right);
}
#pragma omp task
compute_something(tree);
}
int main()
{
int i;
tree_type tree;
init_tree(tree);
#pragma omp parallel
#pragma omp single
{
#pragma omp task
start_background_work();
for (i = 0; i < max_steps; i++) //由一个线程产生task
{
#pragma omp taskgroup
{
#pragma omp task
compute_tree(tree);
} //wait on tree traversal in this step
check_step();
}
} //only now is background work required to be complete
print_results();
return;
}