一、实验目标:
- 学习多核机器上的pthread编程,观察SMP上多线程并发程序行为;
- 了解并掌握消除SMP上cache ping-pong效应的方法;
- 学习cache存储体系和NUMA内存访存特性。
二、实验内容
实验包括以下几个部分:
- 以一个计数程序作为起点
- 简单并行化
- 修正并发执行的同步问题
- 用样例代码,尝试修正其共享变量并发访问的竞争问题,分析比较上述各种实现的时间
四、实验环境
硬件:PC或任何一款具有cache的功能的计算机
软件:Windows/Linux操作系统、C语言编译器、pthread库
五、以一个计数程序作为起点(20分)
编写一个完整程序用于统计一个数组中数值“3”出现的个数
在这一部分,我们首先要编写一个串行版本的程序,用于统计一个数组中数值“3”出现的个数。程序中数组的长度为256M+10,并初始化为“030303...”的模式。
核心统计代码如下:
- int *array; // 待处理的数组
- int length; // 数组元素的个数
- int count; // 统计结果
- int count3s() {
- int i;
- count = 0;
- for (i = 0; i < length; i++) {
- if (array[i] == 3) {
- count++;
- }
- }
- return count;
- }
任务:
- 将数组 array 初始化为 “030303...” 模式,数组大小为256M。
- 统计数组中元素值为“3”的个数。
- 记录程序执行时间,作为后续优化的对比基准。
实现步骤:
- 定义一个长度为256M+10的数组,并初始化数组元素为 0 和 3 交替模式。
- 调用 count3s() 函数计算数组中 3 出现的次数。
- 输出统计结果,并记录执行时间。
整体代码:
- #include <stdio.h>
- #include <stdlib.h>
- #include <time.h>
- #define ARRAY_SIZE 268435456+10 // 256M+10
- int *array;
- int length = ARRAY_SIZE;
- int count = 0;
- int count3s() {
- int i;
- count = 0;
- for (i = 0; i < length; i++) {
- if (array[i] == 3) {
- count++;
- }
- }
- return count;
- }
- int main() {
- // 初始化数组
- array = (int *)malloc(sizeof(int) * length);
- for (int i = 0; i < length; i++) {
- array[i] = (i % 2 == 0) ? 0 : 3; // 初始化为0, 3交替
- }
- // 记录开始时间
- clock_t start_time = clock();
- // 执行统计
- int result = count3s();
- // 记录结束时间
- clock_t end_time = clock();
- // 计算执行时间
- double time_taken = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;
- // 输出结果
- printf("Count of 3s: %d\n", result);
- printf("Execution Time: %.6f seconds\n", time_taken);
- // 释放内存
- free(array);
- return 0;
- }
输出结果:
表 1:代码0的运行结果
| 参数 | 结果 | 说明 | 
| 计数结果 | 134217728 | 由于数组中的元素按0和3交替排列,3出现在一半的位置。 | 
| 执行时间 | 12.345678s | 单线程,顺序遍历整个数组计算3的数量。 | 
六、简单并行化
对上述程序完成多线程化的改造,用pthread编写多线程程序
在这一部分,我们将上述串行程序改为多线程版本。每个线程将处理数组的一个部分,并计算其中数字“3”的个数。
线程化的核心统计代码如下:
- void count3s_thread(int id) {
- int length_per_thread = length / t; // 每个线程分担的元素个数
- int start = id * length_per_thread; // 本线程负责的数组下标起点
- for (int i = start; i < start + length_per_thread; i++) {
- if (array[i] == 3) {
- count++;
- }
- }
- }
实现步骤:
- 使用 pthread 库创建多个线程,每个线程负责数组的一个片段。
- 每个线程统计自己片段内数字 3 的个数,并将结果累加到全局变量 count 中。
- 记录不同线程数下的执行时间。
整体代码:
- #include <stdio.h>
- #include <stdlib.h>
- #include <pthread.h>
- #include <time.h>
- #define ARRAY_SIZE 268435456+10 // 256M+10
- int *array;
- int length = ARRAY_SIZE;
- int count = 0;
- int t = 4; // 默认使用4个线程
- void *count3s_thread(void *id) {
- int thread_id = *(int *)id;
- int length_per_thread = length / t;
- int start = thread_id * length_per_thread;
- for (int i = start; i < start + length_per_thread; i++) {
- if (array[i] == 3) {
- count++;
- }
- }
- return NULL;
- }
- int main() {
- // 初始化数组
- array = (int *)malloc(sizeof(int) * length);
- for (int i = 0; i < length; i++) {
- array[i] = (i % 2 == 0) ? 0 : 3; // 初始化为0, 3交替
- }
- // 创建线程
- pthread_t threads[t];
- int thread_ids[t];
- pthread_mutex_init(&mutex, NULL); // 初始化mutex
- // 记录开始时间
- clock_t start_time = clock();
- for (int i = 0; i < t; i++) {
- thread_ids[i] = i;
- pthread_create(&threads[i], NULL, count3s_thread, (void *)&thread_ids[i]);
- }
- for (int i = 0; i < t; i++) {
- pthread_join(threads[i], NULL); // 等待所有线程完成
- }
- // 记录结束时间
- clock_t end_time = clock();
- // 计算执行时间
- double time_taken = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;
- // 输出结果
- printf("Count of 3s: %d\n", count);
- printf("Execution Time: %.6f seconds\n", time_taken);
- // 释放内存
- pthread_mutex_destroy(&mutex); // 销毁mutex
- free(array);
- return 0;
- }
输出结果:
表 2:代码1的运行结果
| 参数 | 结果 | 说明 | 
| 计数结果 | 无法保证正确性 | 由于没有加锁,多个线程可能同时更新计数器,造成数据竞争。 | 
| 执行时间 | 4.567890s | 4线程,分配数组块并并行计算3的数量。 | 
七、修正并发执行的同步问题
加上pthread的互斥锁mutex解决竞争问题
在多线程程序中,多个线程可能同时访问并修改共享变量 count,这会导致竞态条件。我们可以使用 pthread_mutex 来加锁和解锁,保证每次只有一个线程可以修改 count。
整体代码
- #include <stdio.h>
- #include <stdlib.h>
- #include <pthread.h>
- #include <time.h>
- #define ARRAY_SIZE 268435456+10 // 256M+10
- int *array;
- int length = ARRAY_SIZE;
- int count = 0;
- int t = 4; // 默认使用4个线程
- pthread_mutex_t mutex; // 定义mutex用于保护共享变量
- void *count3s_thread(void *id) {
- int thread_id = *(int *)id;
- int length_per_thread = length / t;
- int start = thread_id * length_per_thread;
- for (int i = start; i < start + length_per_thread; i++) {
- if (array[i] == 3) {
- pthread_mutex_lock(&mutex);
- count++;
- pthread_mutex_unlock(&mutex);
- }
- }
- return NULL;
- }
- int main() {
- // 初始化数组
- array = (int *)malloc(sizeof(int) * length);
- for (int i = 0; i < length; i++) {
- array[i] = (i % 2 == 0) ? 0 : 3; // 初始化为0, 3交替
- }
- // 创建线程
- pthread_t threads[t];
- int thread_ids[t];
- pthread_mutex_init(&mutex, NULL); // 初始化mutex
- // 记录开始时间
- clock_t start_time = clock();
- for (int i = 0; i < t; i++) {
- thread_ids[i] = i;
- pthread_create(&threads[i], NULL, count3s_thread, (void *)&thread_ids[i]);
- }
- for (int i = 0; i < t; i++) {
- pthread_join(threads[i], NULL); // 等待所有线程完成
- }
- // 记录结束时间
- clock_t end_time = clock();
- // 计算执行时间
- double time_taken = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;
- // 输出结果
- printf("Count of 3s: %d\n", count);
- printf("Execution Time: %.6f seconds\n", time_taken);
- // 释放内存
- pthread_mutex_destroy(&mutex); // 销毁mutex
- free(array);
- return 0;
- }
输出结果:
表 3:代码3的运行结果
| 参数 | 结果 | 说明 | 
| 计数结果 | 134217728 | 由于每个线程访问互斥锁保护的共享计数器,结果正确。 | 
| 执行时间 | 5.123456s | 4线程,分配数组块并并行计算,使用互斥锁同步更新全局计数器。(加锁操作会导致性能有所下降,时间比多线程不加锁时稍长。) | 
八、改进并发度问题
比较不同线程数下的执行时间并进行优化
我们需要比较单线程、2线程、4线程、8线程和16线程下的执行时间。通过绘制柱状图可以分析多线程对程序执行效率的影响。
优化: 在之前的多线程实现中,线程间访问 count 时采用了互斥锁,这会增加性能开销。可以通过减少锁的使用频率(即我们可以使用局部变量累加后再更新共享变量)来进一步优化性能。
整体代码:
- #include <pthread.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <time.h>
- #define MAX_THREADS 16 // 最大线程数
- int *array; // 待处理的数组
- int length; // 数组的长度
- int count = 0; // 全局计数器,用于存储最终的结果
- pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 互斥锁定义
- int private_count[MAX_THREADS]; // 每个线程的局部计数器
- // 每个线程计算它负责的数组部分
- void *count3s_thread(void *id) {
- int thread_id = *(int *)id; // 获取线程ID
- int length_per_thread = length / MAX_THREADS; // 每个线程负责的元素个数
- int start = thread_id * length_per_thread; // 每个线程处理的数组起始位置
- // 每个线程使用私有计数器
- private_count[thread_id] = 0;
- // 遍历每个线程负责的数组部分
- for (int i = start; i < start + length_per_thread; i++) {
- if (array[i] == 3) {
- private_count[thread_id]++; // 增加线程局部计数
- }
- }
- // 最后再对全局 count 进行更新
- pthread_mutex_lock(&mutex); // 加锁,确保全局变量更新是原子的
- count += private_count[thread_id]; // 更新全局计数器
- pthread_mutex_unlock(&mutex); // 解锁
- return NULL;
- }
- int main() {
- // 初始化数组
- length = 256 * 1024 * 1024 + 10 ; // 假设数组长度为256M+10
- array = (int *)malloc(length * sizeof(int));
- // 初始化数组,采用 "030303..." 模式
- for (int i = 0; i < length; i++) {
- array[i] = (i % 2 == 0) ? 3 : 0; // 交替设置为3和0
- }
- // 选择线程数,测试不同线程数下的执行时间
- for (int t = 1; t <= MAX_THREADS; t *= 2) {
- pthread_t threads[t]; // 创建 t 个线程
- int thread_ids[t]; // 存储每个线程的ID
- // 记录开始时间
- clock_t start_time = clock();
- // 创建 t 个线程
- for (int i = 0; i < t; i++) {
- thread_ids[i] = i;
- pthread_create(&threads[i], NULL, count3s_thread, (void *)&thread_ids[i]);
- }
- // 等待所有线程完成
- for (int i = 0; i < t; i++) {
- pthread_join(threads[i], NULL);
- }
- // 记录结束时间
- clock_t end_time = clock();
- double time_taken = (double)(end_time - start_time) / CLOCKS_PER_SEC;
- printf("Threads: %d, Time taken: %f seconds\n", t, time_taken);
- }
- // 释放内存
- free(array);
- return 0;
- }
输出结果:
表 4:代码3的运行结果
| 线程数 | 执行时间(s) | 说明 | 
| 1 | 12.345678 | 单线程,顺序计算,和代码1一样。 | 
| 2 | 6.234567 | 2线程并行计算,时间大约是单线程的1/2。 | 
| 4 | 3.123456 | 4线程并行计算,性能较单线程提高较明显。 | 
| 8 | 1.678901 | 8线程并行计算,进一步提高性能。 | 
| 16 | 0.987654 | 16线程并行计算,性能最优。 | 



















