文章目录
- 1.提纲
- 2.第二章 并行硬件&程序设计
- 2.1 SIMD&MIMD
- 2.2 可扩展性
- 2.7 串行程序并行化(poster四步:划分、通信、聚合、分配)
- 3.mpi
- 2.1 点对点
- gemm
- 2.2集合通信
- gemm
- send/recv实现reduce
- send/recv 实现ring AllReduce
- 2.3 加速比
- 2.4 奇偶冒泡排序
- 4. pthread
- 4.1 gemm
- 4.2 临界区
- 1.忙等待
- 2.互斥量
- 3.信号量
- 4. 生产消费者模型
- 方程的根
- reader-writer
- 5.缓存一致性
- 6.伪共享
- 4.3 求pi: critical_sections problem
- 5.openmp
- 5.1 parallel
- 5.2 奇偶冒泡排序
- 5.3 数据依赖
- 6.GPU
1.提纲
实验
MPI:send,rcv,点对点实现集合通信(伪代码)
pthread:gemm,求方程的根(生产消费),缓存一致性,临界区的限制
openmp:
gemm:MPI实现,pthread实现
第二章:并行硬件&程序设计
2.3 SIMD,MIMD,缓存一致性
2.4 分布式内存与共享式内存(MPI和pthread)
2.5 加速比、阿姆达尔定律、强弱可扩展性
2.7 串行程序并行化(poster四步:划分、通信、聚合)
第三章:MPI
点对点:send,rcv
通信集:MPI_Reduce/MPI_AllReduce(用send和recv实现),scatter,gather
3.6 MPI_time:系统时间,算加速比(pthread要用系统时间)
奇偶排序冒泡排序
第四章:Pthread
4.2 线程创建与销毁:thread_create, thread_join
4.3 临界区:解决方案与效率
忙等待:实现
互斥量:
信号量:
4.7 生产消费者模型
读写锁: https://cloud.tencent.com/developer/article/1021461
读多写少
4.10 缓存一致性 https://www.cnblogs.com/xiaolincoding/p/13886559.html
第五章: openmp
parallel for等语法: https://learn.microsoft.com/zh-cn/cpp/parallel/openmp/reference/openmp-directives?source=recommendations&view=msvc-170
数据依赖
奇偶冒泡排序
子句:Reduction, 锁 https://blog.csdn.net/qq_43219379/article/details/123911644
六、GPU: 概念解释
物理组织:GPC,TPC,SM,wrap,SP
逻辑结构:grid,block,thread
2.第二章 并行硬件&程序设计
2.1 SIMD&MIMD
SIMD
MIMD
2.2 可扩展性
效率与可扩展:
强弱可扩展性:
2.7 串行程序并行化(poster四步:划分、通信、聚合、分配)
3.mpi
2.1 点对点
send,rcv
gemm
#include<stdio.h>
#include<stdlib.h>
#include<mpi.h>
#include<iostream>
int main(int argc, char * argv[] ){
double startTime, stopTime;
double *a, *b, *c;
int m,n,k;
m=atoi(argv[1]);
n=atoi(argv[2]);
k=atoi(argv[3]);
int rank, numprocs, lines;
MPI_Init(NULL,NULL);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
lines = m/numprocs;//lines that each thread process
a = new double[m * n];
b = new double[n * k];
c = new double[m * k];
if( rank ==0 ){
double *c_buffer = new double[lines * k];
srand((unsigned)time(0));
gen_matrix(a,m,n);
gen_matrix(b,n,k);
startTime = MPI_Wtime();
//send data
for(int i=1;i<numprocs;i++){//send whole B
MPI_Send( b, n*k, MPI_DOUBLE, i, 0, MPI_COMM_WORLD );
}
for(int i=1;i<numprocs;i++){//send part A
MPI_Send( a + (i-1)*lines*n, n*lines, MPI_DOUBLE, i, 1, MPI_COMM_WORLD);
}
//thread 0 compute the last part A
for (int i = (numprocs - 1) * lines; i < m; i++)
{
for(int j=0;j<k;j++){
double temp = 0;
for(int t=0;t<n;t++)
temp += a[i*n+t]*b[t*k+j];
c[i*k+j] = temp;
}
}
//recv result
for (int t = 1; t < numprocs; t++)
{
MPI_Recv(c_buffer, lines * k, MPI_DOUBLE, t, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
//copy ans to c
for(int i=0;i<lines;i++){
for(int j=0;j<k;j++){
c[((t - 1) * lines + i) * k + j] = c_buffer[i * k + j];
}
}
}
stopTime = MPI_Wtime();
printf("计算所用时间:%lfs\n",stopTime-startTime);
delete[] c_buffer;
}
else{
double *a_buffer = new double[n * lines];
double *b_buffer = new double[n * k];
double *local_ans=new double[lines*k];
MPI_Recv(b_buffer, n * k, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(a_buffer, n * lines, MPI_DOUBLE, 0, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
for(int i=0;i<lines;i++){
for(int j=0;j<k;j++){
double temp=0;
for(int t=0;t<n;t++)
temp += a_buffer[i * n + t] * b_buffer[t * k + j];
local_ans[i * k + j] = temp;
}
}
MPI_Send(local_ans, lines * k, MPI_DOUBLE, 0, 2, MPI_COMM_WORLD);
delete[] a_buffer;
delete[] b_buffer;
delete[] local_ans;
}
delete[] a;
delete[] c;
delete[] b;
MPI_Finalize();
return 0;
}
2.2集合通信
MPI_Reduce/MPI_AllReduce(用send和recv实现),scatter,gather
MPI API 参考: https://zhuanlan.zhihu.com/p/70703729
MPI API 参考: https://mpitutorial.com/tutorials/
广播:
int MPI_Bcast(void* buffer,int count,MPI_Datatype datatype,int root, MPI_Comm comm)
buffer 消息缓冲的起始地址
count 数据的数量
datatype 广播的数据类型
root 广播进程
comm 通信组
Scatter:
int MPI_Scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype,void* recvbuf, int recvcount, MPI_Datatype recvtype,int root, MPI_Comm comm)
sendbuf 发送消息缓冲区的起始地址
sendcount 发送到各个进程的数据个数
sendtype 发送消息缓冲区中的数据类型
recvbuf 接收消息缓冲区的起始地址
recvcount 待接收的元素个数
recvtype 接收元素的数据类型
root 发送进程的序列号
comm 通信域
Gather:
int MPI_Gather(void* sendbuf, int sendcount, MPI_Datatype sendtype,void* recvbuf, int recvcount, MPI_Datatype recvtype,int root, MPI_Comm comm)
sendbuf 发送消息缓冲区的起始地址
sendcount 发送消息缓冲区中的数据个数
sendtype 发送消息缓冲区中的数据类型
recvbuf 接收消息缓冲区的起始地址(各个进程传回root的数据会自动按进程编号排列放到内存中)
recvcount 待接收的元素个数(每个进程的元素个数,而非所有线程总的元素个数)
recvtype 接收元素的数据类型
root 接收进程的序列号
comm 通信域
MPI_Reduce
MPI_Reduce(
void* send_data,
void* recv_data,
int count,
MPI_Datatype datatype,
MPI_Op op,//MPI_MAX, MPI_SUM...
int root,//the rank of root thread
MPI_Comm communicator)
例子:求和,然后计算平均值
float *rand_nums = NULL;
rand_nums = create_rand_nums(num_elements_per_proc);
// Sum the numbers locally
float local_sum = 0;
int i;
for (i = 0; i < num_elements_per_proc; i++) {
local_sum += rand_nums[i];
}
// Print the random numbers on each process
printf("Local sum for process %d - %f, avg = %f\n",
world_rank, local_sum, local_sum / num_elements_per_proc);
// Reduce all of the local sums into the global sum
float global_sum;
MPI_Reduce(&local_sum, &global_sum, 1, MPI_FLOAT, MPI_SUM, 0,
MPI_COMM_WORLD);
// Print the result
if (world_rank == 0) {
printf("Total sum = %f, avg = %f\n", global_sum,
global_sum / (world_size * num_elements_per_proc));
}
MPI_AllReduce: Combines values from all processes and distributes the result back to all processes.
//不需要根进程 ID(因为结果分配给所有进程)
MPI_Allreduce(
void* send_data,
void* recv_data,
int count,
MPI_Datatype datatype,
MPI_Op op,
MPI_Comm communicator)
MPI_Allreduce
等效于先执行 MPI_Reduce
,然后执行 MPI_Bcast
gemm
A用scatter给每个进程分发一部分,B用broadcast完整地广播到所有进程,计算完毕后,用gather收集C。
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <iostream>
int main(int argc, char *argv[])
{
double startTime, stopTime;
int m, n, k;
m = atoi(argv[1]);
n = atoi(argv[2]);
k = atoi(argv[3]);
int my_rank, numprocs, lines;
MPI_Init(NULL, NULL);
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
lines = m / numprocs;
double *a = new double[m * n];
double *b = new double[n * k];
double *c = new double[m * k];
double *local_a = new double[lines * n];
double *local_c = new double[m * k];
if (my_rank == 0)
{
//prepare data
srand((unsigned)time(0));
gen_matrix(a, m, n);
gen_matrix(b, n, k);
startTime = MPI_Wtime();
}
// Send: Scatter A, Bcast whole B
MPI_Scatter(a, lines * n, MPI_DOUBLE, local_a, lines * n, MPI_DOUBLE, 0, MPI_COMM_WORLD);
MPI_Bcast(b, n * k, MPI_DOUBLE, 0, MPI_COMM_WORLD);
// All processes involve calculation
for (int i = 0; i < lines; i++)
{
for (int j = 0; j < k; j++)
{
double temp = 0;
for (int t = 0; t < n; t++)
temp += local_a[i * n + t] * b[t * k + j];
local_c[i * k + j] = temp;
}
}
// Recv: Gather C
MPI_Gather(local_c, lines * k, MPI_DOUBLE, c, lines * k, MPI_DOUBLE, 0, MPI_COMM_WORLD);
if (my_rank == 0)
{
stopTime = MPI_Wtime();
printf("计算所用时间:%lfs\n", stopTime - startTime);
}
delete[] a;
delete[] b;
delete[] c;
delete[] local_a;
delete[] local_c;
MPI_Finalize();
return 0;
}
send/recv实现reduce
send/recv 实现ring AllReduce
课堂测试题: 设10个进程,每个进程有长度10000的数组,给出基于MPI Send和 Recv函数实现Ring Allreduce函数的伪代码。
Ring allreduce是一种最优通信算法。环中的计算节点都被安排在一个逻辑环中。每个计算节点应该有一个左邻和一个右邻,它只会向它的右邻居发送数据,并从它的左邻居接收数据。
该算法分两个步骤进行:首先是scatter-reduce,然后是allgather。在scatter-reduce步骤中,GPU将交换数据,使每个GPU可得到最终结果的一个块。在allgather步骤中,gpu将交换这些块,以便所有gpu得到完整的最终结果。
第一阶段通过(N-1)步,让每个节点都得到1/N的完整数据块。每一步的通信耗时是α+S/(NB),计算耗时是(S/N)* C。 这一阶段也可视为scatter-reduce。
第二阶段通过(N-1)步,让所有节点的每个1/N数据块都变得完整。每一步的通信耗时也是α+S/(NB),没有计算。这一阶段也可视为allgather。
伪代码:
myrank
//scatter-reduce
For i =1->N-1
send(blok myrank, (myrank+1)%N)//向下一个节点发送数据块
recv(rc_buffer, (myrank-1)%N)//从前一个节点接收数据块
add(rc_buffer,block N-myrank)//求和
//allgather
For i =1->N-1
send(blok N-myrank, (myrank+1)%N)//向下一个节点发送数据块
recv(rc_buffer, (myrank-1)%N)//从前一个节点接收数据块
store(rc_buffer,block myrank)//用"全局和"覆盖本地数据块
2.3 加速比
MPI_ Wtime返回墙上时钟时间。回想-一下,C语言中的c1ock函数返回的是CPU时间(包括用户代码、库函数以及系统调用函数所消耗的时间),但它不包括空闲时间,而在并行程序中,很多情况下都是空闲等待状态。例如,调用MPI_ Recv会消耗很多时间来等待消息的到达。而墙上时钟时间给出了所经历的全部时间,包括空闲等待时间。
加速比:最理想是p
并行的效率:每个进程的加速比
例子:理论作业1 2.16
假定一个串行程序的运行时间为𝑇串行 = 𝑛2,运行时间的单位为毫秒。并行程序的运行时间为𝑇并行 = 𝑛2/𝑝 + log2(𝑝)。对于 n 和 p 的不同值,请写出一个程序并找出这个程序的加速比和效率。在 𝑛 = 10、20、40、…、320 和 𝑝 = 1、2、4、…、128 等不同情况下运行该程序。当 𝑝 增加、n 保持恒定时, 加速比和效率的情况分别如何?当 p 保持恒定而 n 增加呢?
假设 𝑇并行 = 𝑇串行/𝑝 + 𝑇开销,我们固定 p 的大小,并增加问题的规模。
请解释如果 𝑇开销 比 𝑇串行增长得慢,随着问题规模的增加,并行效率也将增加。
请解释如果𝑇开销 比𝑇串行增长得快,随着问题规模的增加,并行效率将降低。
- 1.我跑的程序很简单,两个二维矩阵相加,这个程序串行运行时间为𝑇串行 = 𝑛2
void add(int p)
{
for(int i=p;i<p+N/core;i++){
for (int y = 0; y < p+N/core; y++) {
c[i][y]=a[i][y]+b[i][y];
}
}
}
这个的时间复杂度就是n2 ,我的并行策略就是p个线程, 每个计算n/p行
-
对于n保持恒定时,实验结果,n恒定p增加,加速比先增加后降低,效率降低
-
对于p保持恒定时,结果显示,n增加,加速比和效率都增加
-
E
f
f
i
c
i
e
n
t
y
=
T
串
行
T
并
行
×
p
=
T
串
行
(
T
串
行
p
+
T
开
销
)
×
p
=
T
串
行
T
串
行
+
T
开
销
×
p
=
1
1
+
T
开
销
T
串
行
×
p
Efficienty=\frac{T_{串行}}{T_{并行}\times p}= \frac{T_{串行}}{\left(\frac{T_{串行}}{p}+T_{开销}\right)\times p}=\frac{T_{串行}}{{T_{串行}}+T_{开销}\times p}=\frac{1}{1+\frac{T_{开销}}{T_{串行}}\times p}
Efficienty=T并行×pT串行=(pT串行+T开销)×pT串行=T串行+T开销×pT串行=1+T串行T开销×p1
于是显而易见
-
𝑇开销 比 𝑇串行增长得慢,开销串行 减小,效率增加
-
𝑇开销 比 𝑇串行增长得快,开销串行 增大,效率降低
2.4 奇偶冒泡排序
奇偶排序的串行实现:
void odd_even_sort(int a[],int n){
for(int phase=0;phase<n;phase++ ){
if(phase%2==0){//even phase
for (int i = 1; i < n; i += 2)
{
if (a[i - 1] > a[i])
{
swap(a[i - 1], a[i]);
}
}
}else{//odd phase
for (int i = 1; i < n-1; i += 2)
{
if (a[i] > a[i+1])
{
swap(a[i], a[i+1]);
}
}
}
}
}
并行化思路:
伪代码:
sort local keys;// 对本地数据进行排序
for(int phase = 0; phase < n; phase++){// 按照奇偶阶段进行数据交换
parterID = Get_Partner(phase, my_rank);//获取数据交换的进程编号
if(partnerID != -1 && partnerID != comm_sz){// 如果获取到的进程合法
// 进行数据交换
send my keys to partner;
recv keys from partner;
if(my_rank < partnerID){//进程号小于通信编号,保存较小的一半数据
merge_low();//合并两个进程的结果,保存较小的一半数据
}
else{// 否则,保存较大的一半数据
merge_high();
}
}
}
一些具体函数:
1.合并两个有序数组并保留一半
/*该函数用来合并两个进程的数据,并取较小的一半数据*/
void merge_low(int A[], int B[], int local_n)
{
int *ans = new int[local_n]; // 临时数组
int p_a = 0, p_b = 0, i = 0; // 分别为A,B,a三个数组的指针
while (i < local_n)
{
if (A[p_a] <= B[p_b])
ans[i++] = A[p_a++];
else
ans[i++] = B[p_b++];
}
// copy
for (i = 0; i < local_n; i++)
A[i] = ans[i];
delete[] ans;
}
对于Merge_High,只要改下比较符号
2.Get_Partner
/*该函数用来获得某个阶段,某个进程的通信伙伴*/
int Get_Partner(int my_rank, int phase)
{
int partnerID;
if (phase % 2 == 0)//even phase
{
if (my_rank % 2 != 0)//odd rank
partnerID = my_rank - 1;
else
partnerID = my_rank + 1;
}else{//odd phase
if (my_rank % 2 != 0)
partnerID = my_rank + 1;
else
partnerID = my_rank - 1;
}
return partnerID;
}
3.MPI Sendrecv接口
4.完整代码
#include <iostream>
#include <algorithm>
#include <random>
#include <time.h>
#include "mpi.h"
using namespace std;
#define LEN 16 // 数组长度
/*该函数用来合并两个进程的数据,并取较小的一半数据*/
void Merge_Low(int A[], int B[], int local_n);
/*该函数用来合并两个进程的数据,并取较小的一半数据*/
void Merge_High(int A[], int B[], int local_n);
/*该函数用来获得某个阶段,某个进程的通信伙伴*/
int Get_Partner(int my_rank, int phase);
void print_array(int A[],int n);
/*
mpicxx -Wall parallel_odd_even_sort.cpp -o odd_even_sort
mpiexec -n 8 ./odd_even_sort
*/
int main()
{
int local_n; // 各个进程中数组的大小
int *A, *B; // A为进程中保存的数据,B为进程通信中获得的数据
int comm_sz, my_rank;
MPI_Init(NULL, NULL);
// 获得进程数和当前进程的编号
MPI_Comm_size(MPI_COMM_WORLD, &comm_sz);
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
// 计算每个进程应当获得的数据量,动态开辟空间
local_n = LEN / comm_sz;
A = new int[local_n];
B = new int[local_n];
// 随机产生数据并分发至各个进程
int *a = NULL;
if (my_rank == 0){
a = new int[LEN];
// srand((int)time(0));
srand(10);
for (int i = 0; i < LEN; i++)
{
a[i] = rand() % 10;
}
MPI_Scatter(a, local_n, MPI_INT, A, local_n, MPI_INT, 0, MPI_COMM_WORLD);
delete[] a;
}else{
MPI_Scatter(a, local_n, MPI_INT, A, local_n, MPI_INT, 0, MPI_COMM_WORLD);
}
// 先有序化本地数据
sort(A, A + local_n);
// 定理:如果p个进程运行奇偶排序算法,那么p个阶段后,输入列表有序
for (int phase = 0; phase < comm_sz; phase++)
{
int partnerID = Get_Partner(my_rank, phase); // 交换数据的进程号
if (partnerID != -1 && partnerID != comm_sz) // 如果本次数据交换的进程号有效
{
// 与对方进程交换数据
//MPI_Sendrecv(my_keys, n / comm_sz, MPI_INT, partnerID, 0, recv_keys, n / comm_sz, MPI_INT, partnerID, 0, MPI_COMM_WORLD, MPI_STATUSES_IGNORE);
MPI_Sendrecv(A, local_n, MPI_INT, partnerID, 0, B, local_n, MPI_INT, partnerID, 0, MPI_COMM_WORLD, MPI_STATUSES_IGNORE);
if (my_rank < partnerID) // 进程号小于通信编号,保存较小的一半数据
{
Merge_Low(A, B, local_n);
}else{// 否则,保存较大的一半数据
Merge_High(A, B, local_n);
}
}
}
// 收集排序后的数组
a = NULL;
if (my_rank == 0) // 0号进程接收各个进程的A的分量,并输出
{
a = new int[LEN];
MPI_Gather(A, local_n, MPI_INT, a, local_n, MPI_INT, 0, MPI_COMM_WORLD);
print_array(a,LEN);
delete[] a;
}
else // 其余进程将y分量发送至0号进程
{
MPI_Gather(A, local_n, MPI_INT, a, local_n, MPI_INT, 0, MPI_COMM_WORLD);
}
MPI_Finalize();
return 0;
}
/*该函数用来合并两个进程的数据,并取较小的一半数据*/
void Merge_Low(int A[], int B[], int local_n)
{
int *ans = new int[local_n]; // 临时数组
int p_a = 0, p_b = 0, i = 0; // 分别为A,B,a三个数组的指针
while (i < local_n)
{
if (A[p_a] <= B[p_b])
ans[i++] = A[p_a++];
else
ans[i++] = B[p_b++];
}
// copy
for (i = 0; i < local_n; i++)
A[i] = ans[i];
delete[] ans;
}
/*该函数用来合并两个进程的数据,并取较小的一半数据*/
void Merge_High(int A[], int B[], int local_n)
{
int *ans = new int[local_n]; // 临时数组
int p_a = 0, p_b = 0, i = 0; // 分别为A,B,a三个数组的指针
// 这里不同担心数组越界,因为三个数组的大小是相等的
while (i < local_n)
{
if (A[p_a] >= B[p_b])
{
ans[i++] = A[p_a++];
}
else
{
ans[i++] = B[p_b++];
}
}
// copy
for (i = 0; i < local_n; i++)
A[i] = ans[i];
delete[] ans;
}
/*该函数用来获得某个阶段,某个进程的通信伙伴*/
int Get_Partner(int my_rank, int phase)
{
int partnerID;
if (phase % 2 == 0) // even phase
{
if (my_rank % 2 != 0) // odd rank
partnerID = my_rank - 1;
else
partnerID = my_rank + 1;
}
else
{ // odd phase
if (my_rank % 2 != 0)
partnerID = my_rank + 1;
else
partnerID = my_rank - 1;
}
return partnerID;
}
void print_array(int A[], int n)
{
for (int i = 0; i < n; i++)
{
cout << A[i] << "\t";
if (i % 4 == 3)
{
cout << endl;
}
}
}
4. pthread
4.1 gemm
和MPI类似,按矩阵A行划分,每个thread计算一部分行。为了局部性,把矩阵B转置成按列分布,这样每个thread访问的数据有局部性。
核心代码:
void *pthread_gemm(void *id)
{
long tid = (long)id;
int start_row = tid * thread_rows;
int end_row = (tid + 1) * thread_rows - 1;
if (end_row >= M)
end_row = M - 1;
for (int i = start_row; i <= end_row; i++)
{
for (int j = 0; j < K; j++)
{
int temp = 0;
for (int k = 0; k < N; k++)
temp += A[i * N + k] * B[j * N + k];
C[i * N + j] = temp;
}
}
}
int thread_num;
int thread_rows;
int *A, *B, *C;
int M = 0, N = 0, K = 0;
//对B转置N,K -> K,N
void transpose_matrix();
void transpose(int *matrix, int m, int n);
void Gen_Matrix();
void *pthread_gemm(void *id);
// gcc lab4_1.c -o lab4_1 -lpthread
int main(int argc, char *argv[])
{
scanf("%d", &thread_num);
scanf("%d %d %d", &M, &N, &K);
Gen_Matrix();
pthread_t *thread_handles;
thread_rows = M / thread_num; //每个thread要处理的行数
thread_handles = malloc(thread_num * sizeof(pthread_t));
for (long thread = 0; thread < thread_num; thread++)
pthread_create(&thread_handles[thread], NULL, pthread_gemm, (void *)thread);
for (long thread = 0; thread < thread_num; thread++)
pthread_join(thread_handles[thread], NULL);
}
完整代码:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/time.h>
int thread_num;
int thread_rows;
int *A, *B, *C;
int M = 0, N = 0, K = 0;
//对B转置N,K -> K,N
void transpose_matrix();
void transpose(int *matrix, int m, int n);
void Gen_Matrix();
void *pthread_gemm(void *id);
// gcc lab4_1.c -o lab4_1 -lpthread
int main(int argc, char *argv[])
{
// thread_num=4;
// M=N=K=64;
printf("input thread num: ");
scanf("%d", &thread_num);
printf("input M,N,K: ");
scanf("%d %d %d", &M, &N, &K);
Gen_Matrix();
struct timeval start;
struct timeval end;
pthread_t *thread_handles;
thread_rows = M / thread_num; //每个thread要处理的行数
thread_handles = malloc(thread_num * sizeof(pthread_t));
gettimeofday(&start, NULL);
for (long thread = 0; thread < thread_num; thread++)
pthread_create(&thread_handles[thread], NULL, pthread_gemm, (void *)thread);
for (long thread = 0; thread < thread_num; thread++)
pthread_join(thread_handles[thread], NULL);
gettimeofday(&end, NULL);
double time_use = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec);
printf("time used : %f millisecond\n", time_use / 1000);
free(thread_handles);
free(A);
free(B);
free(C);
return 0;
}
void transpose(int *matrix, int m, int n){
int *temp = malloc(sizeof(int) * m * n);
for (int i = 0; i < n; i++)
{
for (int j = 0; j < m; j++)
{
temp[i * m + j] = matrix[j * n + i];
}
}
free(matrix);
matrix=temp;
}
void transpose_matrix()
{
int *temp = (int *)malloc(sizeof(int) * N * K);
int i, j;
for (i = 0; i < K; i++)
{
for (j = 0; j < N; j++)
{
temp[i * N + j] = B[j * K + i];
}
}
free(B);
B = temp;
}
void Gen_Matrix()
{
int i, j;
A = (int *)malloc(sizeof(int) * M * N);
B = (int *)malloc(sizeof(int) * N * K);
C = (int *)malloc(sizeof(int) * M * K);
for (i = 0; i < M; i++)
for (j = 0; j < N; j++)
A[i * N + j] = rand() % 10;
for (i = 0; i < N; i++)
for (j = 0; j < K; j++)
B[i * K + j] = rand() % 10;
// transpose(B,N,K);
transpose_matrix();
}
void *pthread_gemm(void *id)
{
long tid = (long)id;
int start_row = tid * thread_rows;
int end_row = (tid + 1) * thread_rows - 1;
if (end_row >= M)
end_row = M - 1;
for (int i = start_row; i <= end_row; i++)
{
for (int j = 0; j < K; j++)
{
int temp = 0;
for (int k = 0; k < N; k++)
temp += A[i * N + k] * B[j * N + k];
C[i * N + j] = temp;
}
}
}
4.2 临界区
1.忙等待
2.互斥量
3.信号量
semaphore API
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
sem 指定了要初始化的信号量的地址
pshared 0 多线程 非0 多进程
value 指定了信号量的初始值
返回值: 成功 0,错误 -1
int sem_post(sem_t *sem);
功能:信号量的值加1操作.如果因此变为大于0.等待信号量的值变为大于0的进程或线程被唤醒,继续对信号量的值减一.
int sem_wait(sem_t *sem);
功能:减一操作 如果当前信号的值大于0,继续立即返回.
如果当前信号量的值等于0.阻塞,直到信号量的值变为大于0.
int sem_destroy(sem_t *sem);
4. 生产消费者模型
方程的根
完整代码
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <math.h>
#include <sys/time.h>
int a,b,c;//parameter
// 4 thread: 0-thread for b2, 1-thread for 4ac, 2-thread for sqrt, 3-thread: divide
// producer and consumer: 2-thread wait for [0,1]-thread, 3-thread wait for 2-thread
int thread_num = 4;
void *fun_b2(void* rank);
void *fun_4ac(void *rank);
void *fun_sqrt(void *rank);
void *fun_divide(void *rank);
double res1,res2,res3;//result of [0,1,2] threads
double x1, x2; // result of 3-thread
int has_answer=0;//whether equation has answer
sem_t sem1; // 2-thread wait for [0,1]-thread
sem_t sem2; // 3-thread wait for 2-thread
/*
semaphore API:
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
sem 指定了要初始化的信号量的地址
pshared 0 多线程 非0 多进程
value 指定了信号量的初始值
返回值: 成功 0,错误 -1
int sem_post(sem_t *sem);
功能:信号量的值加1操作.如果因此变为大于0.等待信号量的值变为大于0的进程或线程被唤醒,继续对信号量的值减一.
int sem_wait(sem_t *sem);
功能:减一操作 如果当前信号的值大于0,继续立即返回.
如果当前信号量的值等于0.阻塞,直到信号量的值变为大于0.
int sem_destroy(sem_t *sem);
*/
// gcc lab4_3.c -o lab4_3 -lpthread -lm
int main(int argc, char *argv[])
{
printf("Please input a,b,c: ");
scanf("%d%d%d", &a, &b, &c);
struct timeval start;
struct timeval end;
pthread_t *thread_handles = malloc(thread_num * sizeof(pthread_t));
sem_init(&sem1, 0, 2); // 2-thread wait for [0,1]-thread
sem_init(&sem2, 0, 1); // 3-thread wait for 2-thread
gettimeofday(&start, NULL);
pthread_create(&thread_handles[0], NULL, fun_b2, (void *)0);
pthread_create(&thread_handles[1], NULL, fun_4ac, (void *)1);
pthread_create(&thread_handles[2], NULL, fun_sqrt, (void *)2);
pthread_create(&thread_handles[3], NULL, fun_divide, (void *)3);
for (long thread = 0; thread < thread_num; thread++)
pthread_join(thread_handles[thread], NULL);
gettimeofday(&end, NULL);
sem_destroy(&sem1);
sem_destroy(&sem2);
if(has_answer){
printf("x1 = %lf, x2 = %lf\n", x1, x2);
}else{
printf("No answer!\n");
}
double time_use = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec);
printf("time used : %f millisecond\n", time_use / 1000);
free(thread_handles);
return 0;
}
void *fun_b2(void *rank){
res1=b*b;
sem_post(&sem1);//call 2-thread
}
void *fun_4ac(void *rank){
res2=4*a*c;
sem_post(&sem1); // call 2-thread
}
void *fun_sqrt(void *rank){
//wait for 0-thread and 1-thread
sem_wait(&sem1);
sem_wait(&sem1);
int tmp = res1 - res2;
if(tmp>=0){
has_answer=1;
res3 = sqrt(tmp);
}
sem_post(&sem2);
}
void *fun_divide(void *rank){
//wait for 2-thread
sem_wait(&sem2);
int numerator1 = (-b) + res3;
int numerator2 = (-b) - res3;
int denomicator = 2*a;
x1 = numerator1 / denomicator;
x2 = numerator2 / denomicator;
}
reader-writer
三个冲突:
1.reader <--> reader
2.reader <--> writer
3.writer <--> writer
version 1 :reader first
两个锁:
- basic_lock辅助保证读写不同时进行,解决冲突1、2
- write_lock保证读写不同时进行、只有一个writer进入critical section,解决冲突2、3
一旦一个读者获得了readlock,其他的读者也可以获取这个readlock。但是,想要获取writelock的线程,就必须等到所有的读者都结束。
typedef struct __rwlock_t {
sem_t basic_lock;//for syncing changes to shared variable readerNum
sem_t write_lock;//used to allow one witer or many readers
int readerNum;//count of readers in critical section
} rwlock_t;
void rwlock_init(rwlock_t *rw) {
rw->readerNum=0;
sem_init(&rw->basic_lock,0,1);//init with 1
sem_init(&rw->write_lock,0,1);//init with 1
}
void rwlock_acquire_readlock(rwlock_t *rw) {
sem_wait(&rw->basic_lock);
rw->readerNum++;
if(rw->readerNum==1)
sem_wait(&rw->write_lock);//first reader acquires write_lock
sem_post(&rw->basic_lock);
}
void rwlock_release_readlock(rwlock_t *rw) {
sem_wait(&rw->basic_lock);
rw->readerNum--;
if(rw->readerNum==0)
sem_post(&rw->write_lock);//last reader releases write_lock
sem_post(&rw->basic_lock);
}
void rwlock_acquire_writelock(rwlock_t *rw) {
sem_wait(&rw->write_lock);
}
void rwlock_release_writelock(rwlock_t *rw) {
sem_post(&rw->write_lock);
}
存在 starvation: 当存在reader时,writer不能写,如果有无数个reader,writer只能一直等待,会饿死。
原因:reader优先级更高
version 2 : writer first
对称的,可以让writer优先级更高,但reader可能会饿死。
参考: https://en.wikipedia.org/wiki/Readers%E2%80%93writers_problem
int readcount, writecount; //(initial value = 0)
semaphore rmutex, wmutex, readTry, resource; //(initial value = 1)
//READER
reader() {
<ENTRY Section>
readTry.P(); //Indicate a reader is trying to enter
rmutex.P(); //lock entry section to avoid race condition with other readers
readcount++; //report yourself as a reader
if (readcount == 1) //checks if you are first reader
resource.P(); //if you are first reader, lock the resource
rmutex.V(); //release entry section for other readers
readTry.V(); //indicate you are done trying to access the resource
<CRITICAL Section>
//reading is performed
<EXIT Section>
rmutex.P(); //reserve exit section - avoids race condition with readers
readcount--; //indicate you're leaving
if (readcount == 0) //checks if you are last reader leaving
resource.V(); //if last, you must release the locked resource
rmutex.V(); //release exit section for other readers
}
//WRITER
writer() {
<ENTRY Section>
wmutex.P(); //reserve entry section for writers - avoids race conditions
writecount++; //report yourself as a writer entering
if (writecount == 1) //checks if you're first writer
readTry.P(); //if you're first, then you must lock the readers out. Prevent them from trying to enter CS
wmutex.V(); //release entry section
resource.P(); //reserve the resource for yourself - prevents other writers from simultaneously editing the shared resource
<CRITICAL Section>
//writing is performed
resource.V(); //release file
<EXIT Section>
wmutex.P(); //reserve exit section
writecount--; //indicate you're leaving
if (writecount == 0) //checks if you're the last writer
readTry.V(); //if you're last writer, you must unlock the readers. Allows them to try enter CS for reading
wmutex.V(); //release exit section
}
version 3 : fair reading and writing
5.缓存一致性
-
写直达
保持内存与 Cache 一致性最简单的方式是,把数据同时写入内存和 Cache 中,这种方法称为写直达(Write Through)。 -
写回
既然写直达由于每次写操作都会把数据写回到内存,而导致影响性能,于是为了要减少数据写回内存的频率,就出现了写回(Write Back)的方法。
在写回机制中,当发生写操作时,新的数据仅仅被写入 Cache Block 里,只有当修改过的 Cache Block「被替换」时才需要写到内存中,减少了数据写回内存的频率,这样便可以提高系统的性能。 -
问题描述
现在 CPU 都是多核的,由于 L1/L2 Cache 是多个核心各自独有的,那么会带来多核心的缓存一致性(Cache Coherence) 的问题,如果不能保证缓存一致性的问题,就可能造成结果错误。
那缓存一致性的问题具体是怎么发生的呢?我们以一个含有两个核心的 CPU 作为例子看一看。
假设 A 号核心和 B 号核心同时运行两个线程,都操作共同的变量 i(初始值为 0 )。
这时如果 A 号核心执行了 i++
语句的时候,为了考虑性能,使用了我们前面所说的写回策略,先把值为 1
的执行结果写入到 L1/L2 Cache 中,然后把 L1/L2 Cache 中对应的 Block 标记为脏的,这个时候数据其实没有被同步到内存中的,因为写回策略,只有在 A 号核心中的这个 Cache Block 要被替换的时候,数据才会写入到内存里。
如果这时旁边的 B 号核心尝试从内存读取 i 变量的值,则读到的将会是错误的值,因为刚才 A 号核心更新 i 值还没写入到内存中,内存中的值还依然是 0。这个就是所谓的缓存一致性问题,A 号核心和 B 号核心的缓存,在这个时候是不一致,从而会导致执行结果的错误。
要想实现缓存一致性,关键是要满足 2 点:
- 第一点是写传播,也就是当某个 CPU 核心发生写入操作时,需要把该事件广播通知给其他核心;
- 第二点是事物的串行化,这个很重要,只有保证了这个,次啊能保障我们的数据是真正一致的,我们的程序在各个不同的核心上运行的结果也是一致的;
基于总线嗅探机制的 MESI 协议,就满足上面了这两点,因此它是保障缓存一致性的协议。
MESI 协议,是已修改、独占、共享、已实现这四个状态的英文缩写的组合。整个 MSI 状态的变更,则是根据来自本地 CPU 核心的请求,或者来自其他 CPU 核心通过总线传输过来的请求,从而构成一个流动的状态机。另外,对于在「已修改」或者「独占」状态的 Cache Line,修改更新其数据不需要发送广播给其他 CPU 核心。
- 解决方法
监听:进程x修改了某个数据,其他进程在监听总线,进程x会广播给其他进程
目录:建立树状目录,
6.伪共享
这个问题被称为 False Sharing,是因为在计算机中会有Cache,如图所示,每个CPU对应的Cache把内存中同一段地址区域中(Memory中的灰色区域)的值拷贝过去,并且会保持这段地址中值的一致性,所以当CPU 0中修改了这段区域的第一个值(红色所示),那么为了保持一致,会先通过某些机制,更新CPU 1的Cache中对应区域的值,然后才能执行其他任务,然后CPU1又修改了这段地址区域的第二个值(蓝色所示),同样为了保持一致,也会更新CPU0中的Cache中的内容。所以程序会来回地不停更新另一个Cache,导致程序执行很慢。
4.3 求pi: critical_sections problem
// gcc pi.c -o pi -Wall -O3 -lpthread; ./pi 2 10
/*
pi=4(1-1/3+1/5-1/7+...+((-1)^n)*(1/(2n+1))+...)
critical_sections problem
bussy waiting / mutex / semaphore
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/time.h>
#include <pthread.h>
#include <semaphore.h>
int thread_count; // thread's num
int n = 1000000; // 10^6
double sum = 0.0;
int flag = 0;
// method:Busy-Waiting, but it will continually use the CPU accomplishing nothing
void *getSum_bussy_waiting(void *rank);
void test_bussy_waiting();
// method:Mutexes
pthread_mutex_t mutex;//global mutex
void *getSum_mutex(void *rank);
void test_mutex();
// method:semaphore
sem_t sem;
void *getSum_semaphore(void *rank);
void test_semaphore();
int main(int argc, char *argv[])
{
// help
if (argc == 2 && (!strcmp(argv[1],"--help") || !strcmp(argv[1],"-H")) )
{
printf("-----------help massege------------\n");
printf("usage: ./pi Strategy NumberOfThread\n");
printf("Strategy: 1 for bussyWaiting, 2 for mutex, 3 for semahpore\n");
return 0;
}
long Strategy = strtol(argv[1], NULL, 10);
thread_count = strtol(argv[2], NULL, 10);
if (Strategy==1){
printf("bussy waiting:\n");
test_bussy_waiting();
}
else if (Strategy==2){
printf("mutex:\n");
test_mutex();
}else if(Strategy==3){
printf("semaphore:\n");
test_semaphore();
}
return 0;
} // main
void *getSum_bussy_waiting(void *rank)
{
long my_rank = (long)rank;
double factor, my_sum = 0.0;
long long i;
long long my_n = n / thread_count;
long long my_first_i = my_n * my_rank;
long long my_last_i = my_first_i + my_n;
if (my_first_i % 2 == 0)
factor = 1.0;
else
factor = -1.0;
for (i = my_first_i; i < my_last_i; i++, factor = -factor)
{
my_sum += factor / (2 * i + 1);
}
// Use Busy-Waiting to solve critical sections after loop
while (flag != my_rank)
;
sum += my_sum;
flag = (flag + 1) % thread_count;
return NULL;
}
void test_bussy_waiting()
{
struct timeval start;
struct timeval end;
// Use long in case of 64-bit system
long thread;
pthread_t *thread_handles;
gettimeofday(&start, NULL);
// Get number of threads from command line
thread_handles = malloc(thread_count * sizeof(pthread_t));
for (thread = 0; thread < thread_count; thread++)
{
// Create threads
pthread_create(&thread_handles[thread], NULL, getSum_bussy_waiting, (void *)thread);
}
for (thread = 0; thread < thread_count; thread++)
{
// Wait util thread_handles[thread] complete
pthread_join(thread_handles[thread], NULL);
}
gettimeofday(&end, NULL);
free(thread_handles);
long long startusec = start.tv_sec * 1000000 + start.tv_usec;
long long endusec = end.tv_sec * 1000000 + end.tv_usec;
double elapsed = (double)(endusec - startusec) / 1000000.0;
printf("sum = %f, used time = %.4fseconds\n", 4 * sum, elapsed);
}
void *getSum_mutex(void *rank){
long my_rank = (long)rank;
double factor, my_sum = 0.0;
long long i;
long long my_n = n / thread_count;
long long my_first_i = my_n * my_rank;
long long my_last_i = my_first_i + my_n;
if (my_first_i % 2 == 0)
factor = 1.0;
else
factor = -1.0;
for (i = my_first_i; i < my_last_i; i++, factor = -factor)
{
my_sum += factor / (2 * i + 1);
}
// Use Mutexes to solve critical sections after loop
pthread_mutex_lock(&mutex);
sum += my_sum;
pthread_mutex_unlock(&mutex);
return NULL;
}
void test_mutex()
{
struct timeval start;
struct timeval end;
// Use long in case of 64-bit system
long thread;
pthread_t *thread_handles;
gettimeofday(&start, NULL);
// Get number of threads from command line
thread_handles = malloc(thread_count * sizeof(pthread_t));
// -------initialize Mutex
pthread_mutex_init(&mutex, NULL);
for (thread = 0; thread < thread_count; thread++)
{
// Create threads
pthread_create(&thread_handles[thread], NULL, getSum_mutex, (void *)thread);
}
for (thread = 0; thread < thread_count; thread++)
{
// Wait util thread_handles[thread] complete
pthread_join(thread_handles[thread], NULL);
}
gettimeofday(&end, NULL);
free(thread_handles);
//----------destroy mutex
pthread_mutex_destroy(&mutex);
long long startusec = start.tv_sec * 1000000 + start.tv_usec;
long long endusec = end.tv_sec * 1000000 + end.tv_usec;
double elapsed = (double)(endusec - startusec) / 1000000.0;
printf("sum = %f, used time = %.4fseconds\n", 4 * sum, elapsed);
}
void *getSum_semaphore(void *rank)
{
long my_rank = (long)rank;
double factor, my_sum = 0.0;
long long i;
long long my_n = n / thread_count;
long long my_first_i = my_n * my_rank;
long long my_last_i = my_first_i + my_n;
if (my_first_i % 2 == 0)
factor = 1.0;
else
factor = -1.0;
for (i = my_first_i; i < my_last_i; i++, factor = -factor)
{
my_sum += factor / (2 * i + 1);
}
// Use semaphore to solve critical sections after loop
sem_wait(&sem);
sum += my_sum;
sem_post(&sem);
return NULL;
}
void test_semaphore(){
struct timeval start;
struct timeval end;
// Use long in case of 64-bit system
long thread;
pthread_t *thread_handles;
gettimeofday(&start, NULL);
// Get number of threads from command line
thread_handles = malloc(thread_count * sizeof(pthread_t));
// -------initialize semaphore
sem_init(&sem, 0, 1);
for (thread = 0; thread < thread_count; thread++)
{
// Create threads
pthread_create(&thread_handles[thread], NULL, getSum_semaphore, (void *)thread);
}
for (thread = 0; thread < thread_count; thread++)
{
// Wait util thread_handles[thread] complete
pthread_join(thread_handles[thread], NULL);
}
gettimeofday(&end, NULL);
free(thread_handles);
sem_destroy(&sem);
long long startusec = start.tv_sec * 1000000 + start.tv_usec;
long long endusec = end.tv_sec * 1000000 + end.tv_usec;
double elapsed = (double)(endusec - startusec) / 1000000.0;
printf("sum = %f, used time = %.4fseconds\n", 4 * sum, elapsed);
}
5.openmp
5.1 parallel
5.2 奇偶冒泡排序
#include <stdio.h>
#include <stdlib.h>
#include <omp.h>
const int N = 10;
int main()
{
int i,temp,phase;
int a[N] = {10, 8, 79, 55, 13, 2, 45, 68, 11, 7};
# pragma omp parallel num_threads(4) default(none) shared(a, N) private(i, temp, phase)
/*num_threads(4):明确使用4个线程,可以另作修改
default(none) shared(a,N):明确共享的变量,以随时更新所需的公有数据
private(i,tmp,phase):确定每个线程的私有变量,避免线程间数据混淆冲突
*/
for(phase = 0; phase < N; phase++)
{
if(phase % 2 == 0)
{
# pragma omp for
for(i = 1; i< N; i += 2)
{
if(a[i-1] > a[i])
{
temp = a[i-1];
a[i-1] = a[i];
a[i] = temp;
}
}
}
else
{
# pragma omp for
for(i = 1; i < N-1; i += 2)
{
if(a[i] > a[i+1])
{
temp = a[i+1];
a[i+1] = a[i];
a[i] = temp;
}
}
}
}
for(i = 0; i < N; i++)
{
printf("%d ",a[i]);
}
return 0;
}
5.3 数据依赖
修改下面的代码,保证正确性:
//sum
#pragma omp parallel for
for(int i=1;i<n;i++){
x[i]+=x[i-1];
}
x[i]依赖于x[i-1]
尝试: 1.ordered关键词
2.分路计算x[n]
6.GPU
物理组织:GPC,TPC,SM,wrap,SP
逻辑结构:grid,block,thread
物理结构:
•GPU的结构中分为流处理器阵列(SPA,现在为GPC)和存储器系统两部分。
•SPA分为两层,第一层为线程处理器群(TPC),第二层是每个TPC中若干个SM。
•SM(stream Multiprocessor,如右图),相当于一个具有32路SIMD的处理器,每个SM中含有32个core(早期曾被称为SP,Streaming processor)
内存结构
逻辑结构:
逻辑结构与物理结构对应: