文章目录
- 18. 并发与多线程篇
- 说在前面
- (1).线程和进程
- (2).并发和并行
- (3).thread(C++11)
- #1.thread库与thread类
- #2.join和detach方法
- #3.id类和get_id方法
- #4.this_thread和系列操作
- (4).原子操作
- #1.为什么是原子?
- #2.为什么需要原子操作?
- #3.atomic库
- (5).竞态条件
- (6).线程安全与锁
- #1.互斥锁
- i.std::mutex
- ii.std::shared_mutex
- #2.自旋锁
- (7).基于RAII的全新互斥锁
- (8).Peterson算法
- #1.访问意向
- #2.严格轮换法
- #3.Peterson算法的真实实现
- #4.真的没问题吗?
- (9).信号量
- #1.信号量是什么
- #2.标准库semaphore(C++20)
- #3.pthread.h和semaphore.h
- #4.经典同步/互斥问题
- (10).条件变量
- #1.条件变量是什么
- #2.std::condition_variable
- #3.线程安全队列的实现
- 小结
- 参考资料
18. 并发与多线程篇
发现了吗,大标题从面向对象与C++基础变成面向对象与C++进阶了,其实前面两篇已经应该改了,毕竟最近几篇研究的这些东西好像都已经不是C++基础涉及的内容了呢。
这次就让我们来谈谈更高性能的C++吧!我们现在有这么一个问题:快速排序是一个分治算法,它的思路是这样:首先选取一个基准值(pivot),把比基准值小的放左边,比基准值大的放右边,然后得到左右两个子数组,我们再对这两个子数组进行相同的排序操作,直到最后排到只有两个数字的时候,就可以返回结果了,代码如下:
#include <iostream>
#include <vector>
#include <random>
template<typename T>
int partition(std::vector<T>& array, int low, int high) {
T pivot{ array[high] };
int i = low - 1;
for (int j = low; j < high; ++j) {
if (array[j] < pivot) {
i++;
std::swap(array[i], array[j]);
}
}
std::swap(array[i + 1], array[high]);
return i + 1;
}
template<typename T>
void quickSort(std::vector<T>& array, int low, int high) {
if (low < high) {
int pivotIndex = partition(array, low, high);
quickSort(array, low, pivotIndex - 1); // 排序左边部分
quickSort(array, pivotIndex + 1, high); // 排序右边部分
}
}
int main() {
std::vector<int> data;
std::uniform_int_distribution<int> uniform_distribution{ 1, 100 }; // 调用均匀分布
std::default_random_engine e{}; // random库标准随机引擎
for (int i = 0; i < 30; i++) {
data.push_back(uniform_distribution(e));
}
for (auto& u : data) {
std::cout << u << " ";
}
std::cout << std::endl;
quickSort(data, 0, data.size() - 1);
for (auto& u : data) {
std::cout << u << " ";
}
std::cout << std::endl;
return 0;
}
这次我们采用random库的uniform_int_distribution来产生一组服从均匀分布的随机整数,然后进行排序,
结果是如我们所料的,正确地完成了排序,不过你可能会想,这个排序过程中我们先排左边,后排右边,实际上两个部分是互不干扰的,如果我们能让左边和右边同时排序,这么做下去,貌似是可以提升性能的,所以我们可以写出这么一个新的多线程版快速排序:
template<typename T>
void quickSort(std::vector<T>& array, int low, int high) {
if (low < high) {
int pivotIndex = partition(array, low, high);
auto left_func(std::bind(quickSort<T>, std::ref(array), low, pivotIndex - 1));
auto right_func(std::bind(quickSort<T>, std::ref(array), pivotIndex + 1, high));
std::thread left(left_func); // 排序左边部分
std::thread right(right_func); // 排序右边部分
left.join();
right.join();
}
}
只有快速排序变了,这里我们用std::bind分别绑定左右两边的排序,然后创建left和right两个线程分别排序左边和右边的部分,最后再分别等待left和right排序完毕再结束函数,这样确实是并发了,但是你试试把随机数的数量改大一点,这个办法排序的速度远远慢于单线程的版本,这是为什么??
实际上,创建线程和销毁线程对象也是有开销的,当我们考虑采取更加高级的东西来提高性能的时候,我们也要考虑,这个东西本身的诞生是否也会产生额外的开销;如果涉及到比较频繁的线程创建与释放过程,我们可能会采取线程池(Thread Pool) 来管理一批线程,从而减少以上过程带来的额外性能开销。
感觉不太好理解,我们举个例子:你在做饭的时候,需要用到多种调味料,它们都是用广口瓶装的,你如果要用其中某种调味料,你需要把盖子扭开,舀一勺调味料,然后把盖子再合上;如果这个过程只做一到两次,你可能觉得打开了就关上其实效率没多大变化,但是如果你是个厨师,出餐的速度有需求,而你又要经常使用调味料,每次打开罐子就关上实际上会严重影响效率,假设每次加调味料要0.5s,开关罐子共要3s,那么每次都开关消耗3.5s,这么做100次就需要350s,而如果只开关一次,那么加一百次调味料就只需要53s,这个优化可以说是相当大了。
这也就是为什么,我们可能需要采取线程池的方式来优化这种场景下的线程使用,当然,这个实际上是后话了,这好像还是我第一次在正文没开始的时候写这么多东西哈哈哈哈,那就让我们正式开始吧!
说在前面
虽然说这一篇博客是C++系列当中介绍并发和多线程的,但为了完整性的考虑,本篇更多的内容会在介绍真实世界的并发编程的实现,C++的<thread>等内容只是作为实现这些方法或流程的基本工具。
想要真正理解并发和多线程编程,你可能需要一定的操作系统以及并行计算课程的基础,其实这一篇博客从2023年9月就开始写了,但是直到这个时候才发出来,其实本质原因就是,在我刚开始写这个系列博客的时候对于并发编程还只有朴素认知,越写越发现自己没有办法很好地完成这一部分,于是一直拖到修完操作系统课hhhhh,既然我前面已经发了很多操作系统中关于并发的内容了,只要你有时间读过那些内容,这篇博客的内容对你来说应该就会比较简单了。
对于多线程的一个朴素认知是:开一个线程,它就会依照给它的函数去执行,我们可以以此为基础提高程序的运行效率,但真实的情况并不是这样,在前面我们举的并发快速排序的例子中也体现出来了,一个更加真实的理解是:创建一个线程之后,我们会在这个程序中创建一个可以被操作系统/CPU调度的基本单位,它有可能与别的线程并行,也可能不是并行;它有可能比后续创建的线程优先执行,也可能比后续创建的线程后执行,非常遗憾的是,我们不能对并发编程作出任何假设,例如下面这个例子:
#include <iostream>
#include <thread>
using namespace std;
int a = 10;
void thread1()
{
a = 20;
}
void thread2()
{
a = 30;
}
int main()
{
for (int i = 0; i < 20; i++) {
a = 10;
thread t1{ thread1 }, t2{ thread2 };
t1.join();
t2.join();
cout << a << endl;
}
return 0;
}
a的值最终会是多少呢?不确定,你可以试试看,输出的结果20和30都有:
究其本质,实际上是操作系统在线程调度上的不确定性,我们不能假定调度顺序或是执行顺序,因此才会出现一大堆互斥与同步的问题,这也是我们在这一篇博客当中会着重利用C++解决的问题。
下面的几个部分会首先介绍线程和进程、并发和并行等几个基本概念,一定要先理解它们哦。
(1).线程和进程
所以既然要讲多线程,那首先我们就要知道什么是线程,线程的一个典型定义是这样的:线程是进程中的一个独立执行单元,是调度的最小单元。一个进程可以包含多个线程,它们共享相同的上下文和资源。线程之间的切换比进程之间的切换要快,因为线程共享相同的地址空间,不需要切换地址空间。
而进程(Process)的一个典型定义是这样:进程是程序执行的一个实例。一个进程可以包含多个线程,每个线程执行不同的任务,但它们共享相同的资源,如内存空间和文件。每个进程都有自己的地址空间,是独立的执行单元。
简单来说,你在使用Visual Studio编完一个程序之后,点击运行,通过编译之后产生的程序运行起来就会启动一个进程,这个进程具备操作系统分配的一串内存空间,它本身是作为程序执行的一个实例存在;而线程则是进程中的一个小单元,在我们不采取多线程的编程模式时,整个程序进程中就只存在一个主线程,程序的生命周期与主线程保持一致。
因此在这一篇中,我们会提到怎么采取在一个进程中创建多个线程来提高程序执行的效率,毕竟多进程的通信是一个相对比较复杂的问题,它可能需要使用消息队列、管道、共享内存、信号以及套接字(网络) 的方法来完成,听起来要困难不少,而多线程程序保证多个线程共享内存空间,这样一来我们只要保证不出现访问冲突,通信就是一件比较容易的事情了。
还有一个需要注意的事情:多线程的应用程序每个线程的执行顺序是不确定的,比如你创建了th1, th2, th3三个线程并让它们执行相同的操作,你其实不能确定究竟哪个线程会率先完成任务,这对于我们之后涉及到的内存管理是不太有利的,不过还是有一些缓解的方案。
(2).并发和并行
并发和并行也是两个比较常见的说法,但是这二者不是对立的。
并发是指多个任务在同一时间段内执行,并且共享CPU时间,它通过一定的调度算法让多个任务看起来是同一时间执行的。
并发并没有强调说多个任务一定是同时执行的,如果你对Python的多线程有所了解,你可能知道一个叫做GIL(Global Interpreter Lock,全局解释器锁)的东西:
它保证一个解释器只具备一把GIL,但这样也会使得Python的多线程在同一时间只有一个线程能够获取GIL,也就是说,Python的多线程不能真正提高运行效率,不过貌似Python 3.12已经在尝试改进GIL的一些性质,如果你有兴趣也可以去研究一下。
并行就比较好理解了,它指的是多个任务在同一时刻真正同时执行,例如你打开你的任务管理器,在CPU那儿选择逻辑处理器,如果在不同的逻辑处理上运行不同的程序,那这就是真正的同时执行了。
(3).thread(C++11)
接下来就是比较核心的内容了,虽然你可能会发现,这一节之后还有九节,但是我们如果连线程都不知道怎么创建,那还是比较难继续进行下去的。
#1.thread库与thread类
C++在C++11前标准库中没有统一的多线程库,在Windows下需要通过Windows API来完成多线程操作,而在Linux/Unix下则需要通过pthread.h来完成多线程操作,这样做其实是很麻烦的,因为不同平台下需要编写不同的代码,虽然C++的代码跨平台性也不咋地吧,但是好歹也是需要在不同平台下重新编译即可,多线程需要重新编写代码,这个就很麻烦了。
所以,C++11统一了操作,在标准库中引入了<thread>头文件,你只需要
#include <thread>
就可以开始使用多线程了!不过Linux下还是和使用pthread一样需要在编译的时候加上-lpthread才能正常使用。在thread库中有一个thread类用于创建线程,它的一系列构造函数如下:
thread() noexcept;
thread( thread&& other ) noexcept;
template< class Function, class... Args >
explicit thread( Function&& f, Args&&... args );
线程可以无参构造、右值引用构造或者传入函数以及全部参数进行构造,这里的函数是广义的函数,包括函数指针、std::bind对象、std::functional对象以及lambda表达式都可以作为参数传入,不过这个参数的类型一定要与函数参数的类型匹配,包括引用和常引用都要原模原样地传入,具体方法如下:
#include <iostream>
#include <functional>
#include <thread>
struct A {};
int f(const A& a) { return 1; }
int g(A& a) { return 2; }
int main()
{
A a;
// 通过std::ref()和std::cref()直接传入构造函数
std::thread th1{ f, std::cref(a) }; // std::cref()传入常量引用
std::thread th2{ g, std::ref(a) }; // std::ref()传入引用
th1.detach();
th2.detach();
// 先创建std::bind对象,再创建线程
auto std::bind th3_func{ f, std::cref(a) }; // 创建std::bind对象,传入std::cref()
auto std::bind th4_func{ g, std::ref(a) }; // 创建std::bind对象,传入std::ref()
std::thread th3{ th3_func };
std::thread th4{ th4_func };
th3.detach();
th4.detach();
return 0;
}
就是这样,我们直接传入函数和对应的参数,就可以创建线程对象了,线程对象一经创建就会立刻开始执行,不需要通过方法手动启动。
#2.join和detach方法
这两个方法在多线程的程序里至少要用一个,否则会报错,二者确定了一个线程未来的发展情况,使用join方法的线程会阻塞当前所在的线程,使得当前线程等待对应线程执行完毕后再继续执行;而detach方法则是放养,被执行detach方法的线程的函数执行完毕后会自动释放资源。
std::thread th{f};
f.join();
不过需要注意的一点是detach后的线程并不会被主线程等待,如果主线程的任务提前结束,由于整个程序的中止,detach的线程也会一并被终止掉,这样可能导致你看不到想要的结果,这时候你可能需要让你的主线程等几秒,等到所有线程完成任务。
join和detach都只能被调用一次,我们可以通过joinable()函数来判断是否可以进行join操作:
#include <iostream>
#include <thread>
void f() {}
int main()
{
std::thread th{f};
th.join();
std::cout << std::boolalpha << th.joinable() << std::endl;
return 0;
}
所以只要执行一次join或detach函数之后,joinable的结果都会变成false
#3.id类和get_id方法
thread类中还包含一个成员类id,用于唯一标识一个线程,你可以使用get_id()方法来获取某个线程的id,例如:
#include <iostream>
#include <thread>
#include <vector>
void f() {}
int main()
{
std::vector<std::thread> vec;
for (int i = 0; i < 10; i++) {
vec.emplace_back(std::thread{ f });
std::cout << vec[i].get_id() << std::endl;
vec[i].detach();
}
return 0;
}
就是这样,我们可以直接把id打印出来,注意:未被赋予任务、已结束、已被移动或已被detach的线程的id可以是id{},你可以试试先detach再打印id,结果可能是一片"0"
#4.this_thread和系列操作
有的时候你可能会好奇:我到底是谁呢? 有的时候线程也会想知道这个问题,因此我们有this_thread命名空间下的一系列函数可以用于获取当前线程的参数,也可以执行一些操作:get_id()、sleep_for()、sleep_until()、yield(),这四个函数对于一般的线程对象也是可以直接用的,在这里就不过多展示了
首先是get_id(),获取当前线程的id,这个可以用来定位,例如:
#include <iostream>
#include <thread>
#include <vector>
void f()
{
std::cout << "Current thread: " << std::this_thread::get_id() << std::endl;
}
int main()
{
std::vector<std::thread> vec;
for (int i = 0; i < 3; i++) {
vec.emplace_back(std::thread{ f });
vec[i].detach();
}
std::cout << "Main thread: " << std::this_thread::get_id() << std::endl;
return 0;
}
这个函数的执行结果可能就不会像你想的那么好了:
这只是一种情况,事实上,每次你执行这段代码结果都不一样,这其实也是多线程程序最容易出现的问题:不同线程的执行顺序是随机的,你不能凭借几乎相同的线程创建时间就认为它们的执行也是依照这个顺序完成的,这时候我们只要把detach换成join,就正常了:
sleep_for()和sleep_until()分别是暂停当前线程xx时间以及暂停当前线程直到xx时间,例如:
#include <iostream>
#include <thread>
#include <chrono>
int main()
{
std::this_thread::sleep_for(std::chrono::seconds((3)));
return 0;
}
我们需要用到chrono库中对于时间的定义,这里使得主线程暂停3秒,3s后,程序就自动结束了,sleep_until要稍微复杂一些,我们需要传入一个固定的时间点,这里就不再举例了。
yield()则是让出当前线程的资源,使得程序重新调度线程的执行,这一部分让出的资源允许被别的线程使用;好了,std::thread的基本情况就介绍到这里了,你可能会觉得,还挺简单的,操作很少嘛,确实,但是这并不代表着多线程的程序就有这么简单了,因为还有一大堆线程安全相关的问题在等着我们呢
(4).原子操作
#1.为什么是原子?
古希腊哲学家德谟克利特认为万物的本源就是原子和虚空,原子是最后的不可分的物质微粒,并且用ἄτομος这个词来表示原子,意思就是不可再分,虽然到现在我们知道原子还是可继续被切分为原子核和电子,但是我们这不是个核物理教程,计算机科学家们援引ἄτομος的含义,以演化成的英文词汇atom(atomic)作为并发过程中不可再分割的运行过程的名字,我们也可以直接称之为:原子操作。
#2.为什么需要原子操作?
那为什么我们需要原子操作呢?难道操作还会被打断吗?我们再看看前面提到的Python GIL
因为一个解释器只具有一把GIL,所以在一段时间内,只有一个线程能够获取GIL,从而才能运行,但是在图中我们会发现这个线程1实际上有两个时间段都在运行,也就是说,它的操作被打断成了多个部分,并且在不同的时间内执行,如果你接触过计算机网络的话,你可能还记得电路交换中的时分多路复用,过程是差不多的。
那这样,有什么问题吗?好像没有,这主要是我们的操作还没有不可间断的需求,设想这么一个场景:对于一个银行系统,线程A进行了从Alice到Bob之间的一笔转账,金额是100,那应该是Alice扣100的同时Bob也增加100,当然这两件事不可能同时发生,那么假设在Alice扣款100的时候,另一个线程B以并发的形式,暂时占用了资源,线程A在给Bob增加100的时候被暂停了,线程B试图查询Alice的金额和Bob的金额,这时候发现Alice的余额少了,而Bob的余额也没有增加,这就出问题了,虽然我们可能都知道让它继续执行还是会正常的,但就是这样有可能会导致不必要的事情发生。
所以我们需要原子操作,作为一个不可再分的操作,它要么全都发生,要么全都不发生,在C++中,我们有简单的atomic变量以及比较复杂的基于互斥锁实现的原子函数,这一小节里我们先介绍一下基于atomic库实现的原子变量。
#3.atomic库
首先我们需要引入atomic库:
#include <atomic>
其中包含了std::atomic<T>类型,这个T可以是任何满足以下所有要求的类:
- std::is_trivially_copyable<T>::value,可平凡复制类型(包含以下类型:标量类型(算术类型、指针、成员指针、枚举类型),可以简单复制的类,平凡可复制对象的数组
- 平凡大概指的是:不是用户提供的,例如默认的拷贝构造函数等,类的其中没有virtual成员
- std::is_copy_constructible<T>::value,可拷贝构造
- std::is_move_constructible<T>::value,可移动构造
- std::is_copy_assignable<T>::value,可拷贝赋值
- std::is_move_assignable<T>::value,可移动赋值
上面提供的前面的代码会根据类,返回一个布尔值,你可以凭此判断你的类是否符合要求,当然,你也可以直接:
std::atomic<A> s;
如果你的类A满足以上所有要求,那么它就能创建成功,否则程序就会被静态断言中止。
对于内置算术类型,atomic库中有一系列别名:
类型 | 别名 | 模板 |
---|---|---|
bool | atomic_bool | std::atomic<bool> |
char | atomic_char | std::atomic<char> |
unsigned char | atomic_uchar | std::atomic<unsigned char> |
int | atomic_int | std::atomic<int> |
unsigned int | atomic_uint | std::atomic<unsigned int> |
long | atomic_long | std::atomic<long> |
unsigned long | atomic_ulong | std::atomic<unsigned long> |
long long | atomic_llong | std::atomic<long long> |
unsigned long long | atomic_ullong | std::atomic<unsigned long long> |
int8_t | atomic_long | std::atomic<std::int8_t> |
uint8_t | atomic_ulong | std::atomic<std::uint8_t> |
int_least8_t | atomic_int_least8_t | std::atomic<std::int_least8_t> |
uint_least8_t | atomic_uint_least8_t | std::atomic<std::uint_least8_t> |
int_fast8_t | atomic_int_fast8_t | std::atomic<std::int_fast8_t> |
uint_fast8_t | atomic_uint_fast8_t | std::atomic<std::uint_fast8_t> |
上表中列出了一系列常用类型的模板别名,其中intK_t和uintK_t对应的是指定K位的整形,int_leastK_t和uint_leastK_t对应的是最少K位的整形,而int_fastK_t和uint_fastK_t则对应了K位的快速整形,上述的K可以取到8,16,32,64四种。
对应上述的整形,std::atomic<T>模板提供特化的方法:
方法名 | 操作 |
---|---|
fetch_add | 加法 |
fetch_sub | 减法 |
fetch_and | 按位与 |
fetch_or | 按位或 |
fetch_xor | 按位异或 |
对于所有的std::atomic<T>模板,则有以下常用的成员函数:
函数 | 操作 |
---|---|
is_lock_free | 检查原子对象是否免锁 |
store | 原子地以非原子对象替换原子对象的值 |
load | 原子地获取原子对象的值 |
exchange | 原子地替换原子对象的值并获得它先前持有的值 |
wait(C++20) | 阻塞线程直至被提醒且原子值更改 |
notify_one(C++20) | 提醒至少一个在原子对象上的等待中阻塞的线程 |
notify_all(C++20) | 提醒所有在原子对象上的等待中阻塞的线程 |
关于atomic库的细节你可以参阅cppreference,这里不再赘述。
这个exchange方法,是不是有点眼熟呢?还记得我在操作系统实验的锁机制的应用这一篇当中对于xv6内核的自旋锁获取锁操作的代码解读吗?
void
acquire(struct spinlock *lk)
{
push_off(); // disable interrupts to avoid deadlock.
if(holding(lk))
panic("acquire");
#ifdef LAB_LOCK
__sync_fetch_and_add(&(lk->n), 1);
#endif
while(__sync_lock_test_and_set(&lk->locked, 1) != 0) {
#ifdef LAB_LOCK
__sync_fetch_and_add(&(lk->nts), 1);
#else
;
#endif
}
__sync_synchronize();
lk->cpu = mycpu();
}
说实话,这个代码不算很好读,我去掉了注释,因为这里就不提关中断和内存屏障了,这里主要可以看看__sync_lock_test_test_and_set这个函数,对于可能学过操作系统的你来说,这个东西可太熟悉了,它就是我们说的xchg操作,或者叫做TSL(Test and Set Lock)操作,它只做一件事:尝试用传入值放入一个变量中,并且把变量先前的值返回回来,所以我们提到的原子变量的exchange操作,好像就是这个诶,对的,你可以用exchange指令实现一个简单的用户级自旋锁。
(5).竞态条件
老熟人了,竞态条件。它的定义是这样的:如果程序运行顺序的改变会影响最终结果,那么这就是一个竞态条件。好像有点理解了,其实我们可以举个例子:
#include <iostream>
#include <thread>
#include <chrono>
int n{ 2 };
void f1()
{
n += 3;
}
void f2()
{
n *= 3;
}
void test()
{
std::thread th1{ f1 }, th2{ f2 };
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << n << std::endl;
n = 2;
th1.join();
th2.join();
}
int main()
{
for (int i = 0; i < 20; i++) {
test();
}
return 0;
}
在看结果之前,我们先想想,这段代码的结果应该是什么,如果按照th1,th2的创建顺序,那理论上将这个结果应该是(2+3)*3=15才对,不过事实的确如此吗?
你看,除了15的结果之外,还有一个9,9应该是这么算出来的2*3+3=9,所以说,在不保证互斥访问临界区的情况下,对于同一个变量的同时写入会导致不确定的结果,当然,在Debug模式和Release模式下跑出来的结果可能是不一样的,这个例子只是想让你知道:即便代码里的对象创建顺序有先后,它们的实际运行顺序也是不确定的,而不确定的运行顺序会导致我们的运行结果不同,这就是竞态条件的一种,一般来说,竞态条件有以下四种:
- 并发读写: 多个线程同时读取并修改同一个共享变量
- 写-写冲突: 多个线程同时尝试写入相同的数据
- 读-写冲突: 一个线程在另一个线程写入数据时尝试读取
- 写-读冲突: 一个线程在另一个线程读取数据时尝试写入
想要解决这些问题,我们有一大堆方案,例如我之前操作系统博客当中提到的自旋锁、互斥锁、信号量、条件变量,亦或是一些基于硬件实现的互斥算法,例如Peterson算法,我们会在下面的内容当中完整地介绍这些这些方式的应用。
(6).线程安全与锁
加锁算是一种最轻松的办法,当然,这只是我们认为最轻松的办法,当加锁产生死锁的时候,我们可能要付出十倍的努力来解决这些问题。
#1.互斥锁
在C++标准库的<mutex>头文件中定义了我们非常常用的互斥锁,即std::mutex,而另一种则是定义在<shared_mutex>中的std::shared_mutex
i.std::mutex
std::mutex就是我们非常常见的二元互斥量,我们可以通过下面这个例子简单利用它解决一个问题:100个线程要同时对一个变量sum加100000次3,我们可以写出一个没有互斥机制的代码:
#include <iostream>
#include <thread>
#include <vector>
using namespace std;
constexpr int N{ 100 };
vector<thread> thrs;
int sum = 0;
void thr()
{
for (int i = 0; i < 100000; i++) {
sum += 3;
}
}
void init()
{
for (int i = 0; i < N; i++) {
thrs.push_back(thread{ thr });
}
}
int main()
{
init();
for (auto& t : thrs) {
t.join();
}
cout << "sum == " << sum << endl;
return 0;
}
试试看吧,得到3000w这个正确答案的概率非常小,大概率每次出现的答案都不一样,这就是因为出现了写的冲突,因为我们不能保证一个+=3的操作都是原子的,即A线程读取当前值33,这时候发生调度,B线程打断了A的执行,B也读取值得到33,只要出现了两个线程读到相同值的情况,未来就不可能再有正确的结果了,未来输出的结果是多少都有可能了,所以我们可以用原子变量改造一下这部分代码:
#include <iostream>
#include <thread>
#include <vector>
#include <atomic>
using namespace std;
constexpr int N{ 100 };
vector<thread> thrs;
atomic_int sum = 0;
void thr()
{
for (int i = 0; i < 100000; i++) {
sum.fetch_add(3);
}
}
void init()
{
for (int i = 0; i < N; i++) {
thrs.push_back(thread{ thr });
}
}
int main()
{
init();
for (auto& t : thrs) {
t.join();
}
cout << "sum == " << sum << endl;
return 0;
}
这串代码就可以执行出正确的结果了,这就是一种经典的无锁并发方法,即利用原子变量通过硬件保证的方式完成临界区的访问不出错,但是原子操作不能保护所有的变量,因此我们可能还是需要用锁来保障线程安全,所以另一种利用互斥锁的解决方案是这样:
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
using namespace std;
constexpr int N{ 100 };
vector<thread> thrs;
int sum = 0;
std::mutex lk;
void thr()
{
lk.lock();
for (int i = 0; i < 100000; i++) {
sum += 3;
}
lk.unlock();
}
void init()
{
for (int i = 0; i < N; i++) {
thrs.push_back(thread{ thr });
}
}
int main()
{
init();
for (auto& t : thrs) {
t.join();
}
cout << "sum == " << sum << endl;
return 0;
}
变化主要在于新建了一个std::mutex类型的变量lk,并且在每个线程进行计算前先获得锁,然后进行计算,在计算结束之后释放锁,虽然运行速度会降低一点,但至少结果是对的。
哇哦,我们好像已经掌握并发编程的真谛了,看到临界区,就在访问前加锁保护就好了,真的是这样吗?我们看另一个例子(生产者—消费者问题):现在有两个线程,一个作为生产者,一个作为消费者,二者共用一个临界区变量,生产者和消费者在一个时间内只有一个能访问临界区,只有生产者生产了内容放入变量中,消费者才能读取,在这个基础上,我们给出生产者和消费者两个线程执行函数的基本定义:
void Tproduce()
{
int time = 100;
while (time--) {
share = rand() % 1000000;
cout << "Produce : " << share << endl;
}
}
void Tconsume()
{
int time = 100;
while (time--) {
cout << "Consume : " << share << endl;
}
}
按照我们的要求,这两个线程应该满足Produce打印一句,Consume打印一句,这么交替进行,但目前给的代码肯定做不到这一点,所以你可能会写出这样的代码:
#include <iostream>
#include <cstdlib>
#include <thread>
#include <mutex>
using namespace std;
int share{ 0 }, cnt{ 1 };
std::mutex lk, produce;
void Tproduce()
{
int time = 100;
while (time--) {
produce.lock();
while (cnt != 0);
produce.unlock();
lk.lock();
share = rand() % 1000000;
cout << "Produce : " << share << endl;
lk.unlock();
produce.lock();
cnt++;
produce.unlock();
}
}
void Tconsume()
{
int time = 100;
while (time--) {
produce.lock();
while (cnt != 1);
produce.unlock();
lk.lock();
cout << "Consume : " << share << endl;
lk.unlock();
produce.lock();
cnt--;
produce.unlock();
}
}
int main()
{
thread p{ Tproduce }, c{ Tconsume };
p.join();
c.join();
return 0;
}
它看着有点扭曲,利用了两个互斥锁,一个保证临界区的互斥访问,还有一个则是保证一个计数变量的互斥访问,因为要同步,cnt为1代表当前临界区已经存入生产结果,此时生产者必须等待,而消费者可以读取,然后你可以尝试运行一下这段代码:它当然很快就会停下来,为什么呢?因为产生了死锁,我们可以设计一个很简单的调度序列:Tproduce首先获取produce锁,尝试读取cnt,此时因为cnt不为0,它会陷入死循环,此时打断Tproduce,调度执行Tconsume,但它的第一步是尝试获取produce锁,完蛋了,produce在Tproduce线程中还没被释放,而且Tproduce在等待Tconsume把cnt变为0,这就是一个经典的死锁,所以加锁对于这种互斥同步问题理论上讲要实现还是有困难的,不是简单加锁就可以完成的,当然,如果你真的还是想用互斥锁实现,也有一个更扭曲的实现方案:
#include <iostream>
#include <cstdlib>
#include <thread>
#include <mutex>
using namespace std;
int share{ 0 }, cnt{ 1 };
std::mutex lk, produce;
void Tproduce()
{
while (true) {
if (produce.try_lock()) {
if (cnt != 0) {
produce.unlock();
continue;
}
else {
lk.lock();
share = rand() % 1000000;
cout << "Produce : " << share << endl;
cnt++;
lk.unlock();
produce.unlock();
}
}
}
}
void Tconsume()
{
while (true) {
if (produce.try_lock()) {
if (cnt != 1) {
produce.unlock();
continue;
}
else {
lk.lock();
cout << "Consume : " << share << endl;
cnt--;
lk.unlock();
produce.unlock();
}
}
}
}
int main()
{
thread p{ Tproduce }, c{ Tconsume };
p.join();
c.join();
return 0;
}
这段代码是可以正确执行的,改成while (true)主要也是因为try_lock在失败情况下会开启下一轮循环,这样会浪费time导致提前某个线程结束,所以这段代码到底做了什么呢?我们仍然利用第二个互斥锁维持一个计数变量的互斥访问,但是获取锁从lock变成try_lock,try_lock会尝试获取一次锁,如果成功返回true,否则返回false,因此我们不断尝试获取锁,如果获取锁成功,就检测一下cnt是否满足需求,满足就完成线程需要的事情,否则就释放锁,然后开启下一轮循环,由于这个实现方案中,锁没有被长期占用,因此不会产生死锁。
当然,你可能发现了:这不就是模拟了一个二元信号量吗? 你是对的,这还真是模拟了一个二元信号量,所以真正解决类似这种生产者—消费者问题的时候,信号量或条件变量会比互斥锁更好用一点,我们会在之后详细介绍信号量和条件变量。
当然,直接调用lock和unlock显得不那么现代化,C++提供了std::lock_guard,利用RAII来自动管理锁的获取和释放,例如:
#include <mutex>
int s = 0;
std::mutex mtx;
void T()
{
const std::lock_guard<std::mutex> lk(mtx);
do_something_to(s);
}
在对象创建时获取锁,在生命周期结束时会自动释放锁,这就是利用RAII完成的锁管理机制,当然,我是更习惯直接用lock和unlock的,因为锁的获取和释放周期有的时候需要更加精细地控制。
ii.std::shared_mutex
std::shared_mutex则是一种允许读时共享,写时独享的互斥锁,即多线程读不产生竞态条件,因此读锁可以共享,但是写与读、写都产生静态条件,因此写锁必须独享。
当然,锁本身不涉及写入和读取的操作,只是std::shared_lock本身同时支持共享和独享两种获取和释放锁的方式:
int val = 0;
std::shared_mutex lk;
void Tread()
{
lk.lock_shared();
cout << val << endl;
lk.unlock_shared();
}
void Twrite()
{
lk.lock();
val = 10;
lk.unlock();
}
利用共享锁你可以比较轻松地解决读者—写者问题,这里暂时先不提了。
#2.自旋锁
自旋锁是什么?很简单,是一种利用忙等待的方式完成获取和释放的锁,举个例子:你坐在电脑前焦急地等待录取结果,这可能需要你一直刷新网页,那么每隔2s,你就刷新一次网页,看一次结果,这样的过程就叫做忙等待,你把你的时间全部用在等待锁上;而还有的人选择订阅某些机构的提醒服务,当结果出来的时候,提醒服务会告诉你:结果出来了,而这段时间你就不用再去电脑前一直守着了,这就是两种不同的机制,前者是简单的等待实现,而后者是基于通知的等待实现,而后者常用于信号量和互斥锁的实现。
你也发现了,自旋锁大部分时间用在尝试获取锁上了,所以自旋锁的忙等待可能会造成相当的资源浪费,但是对于一些锁竞争不严重的场合,用自旋锁也是完全可以的。
但是很遗憾,C++本身没有提供自旋锁,不过没关系,在前面的原子变量部分我们了解了atomic变量有一个exchange方法,即TSL,我们可以用TSL来实现一个简单的自旋锁:
#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
using namespace std;
constexpr int N{ 100 };
vector<thread> thrs;
struct spinlock
{
atomic_bool lock_t;
spinlock()
{
lock_t.store(false);
}
void acquire()
{
while (lock_t.exchange(true));
}
void release()
{
lock_t.store(false);
}
~spinlock() = default;
};
int sum = 0;
spinlock lock;
void thr()
{
lock.acquire();
for (int i = 0; i < 100000; i++) {
sum += 3;
}
lock.release();
}
void init()
{
for (int i = 0; i < N; i++) {
thrs.push_back(thread{ thr });
}
}
int main()
{
init();
for (auto& t : thrs) {
t.join();
}
cout << "sum == " << sum << endl;
return 0;
}
很简单吧,这样就可以实现我们需要的功能了,不过你可以特别关注一下函数thr:
void thr()
{
lock.acquire();
for (int i = 0; i < 100000; i++) {
sum += 3;
}
lock.release();
}
在这里我们是让每个线程做完10w次加法之后再释放锁,因此对于100个线程只会有100次acquire和release,速度还算快;从理论的角度上看,你把acquire和release放在加法的前后,也是完全没问题的:
void thr()
{
for (int i = 0; i < 100000; i++) {
lock.acquire();
sum += 3;
lock.release();
}
}
的确逻辑上没问题,但是这样可能就有1000w次acquire和release操作了,这个CPU占用率会变得相当恐怖,你可以试试看,把数字调小一点,这个例子可以生动地体现出自旋锁究竟不适合做什么,在锁竞争严重的场合,自旋锁的忙等待会浪费非常多CPU资源
(7).基于RAII的全新互斥锁
前面我们提到<mutex>头文件中为std::mutex提供了std::lock_guard从而利用RAII的方式来管理互斥锁的获取与释放,那么RAII机制还可以用来帮助我们完成各种类型锁的独占和共享获取,因此C++提供了std::unique_lock和std::shared_lock来提供类似std::lock_guard的RAII锁管理,例如:
#include <cstdio>
#include <shared_mutex>
#include <thread>
#include <vector>
#include <Windows.h>
constexpr int wcnt{ 2 }, rcnt{ 10 };
std::vector<std::thread> writers, readers;
std::shared_mutex lk;
void Twrite()
{
while (true) {
std::unique_lock ulk(lk);
printf("Twrite\n");
Sleep(1000);
}
}
void Tread()
{
while (true) {
std::shared_lock slk(lk);
printf("Tread\n");
}
}
void init()
{
for (int i = 0; i < wcnt; i++) {
writers.push_back(std::thread{ Twrite });
}
for (int i = 0; i < rcnt; i++) {
readers.push_back(std::thread{ Tread });
}
}
int main()
{
init();
for (int i = 0; i < wcnt; i++) {
writers[i].join();
}
for (int i = 0; i < rcnt; i++) {
readers[i].join();
}
return 0;
}
这里使用printf主要是因为std::ostream不是线程安全的,因此输出的多个Tread有可能会出现在同一行,而printf则保证了线程安全。
所以std::unique_lock和std::shared_lock最大的特征就是省事,你只要将它放在合适的位置,它就能在整个它生效的作用域内自动帮你完成操作,因此我也就不再赘述了。
(8).Peterson算法
听完了这么多锁的内容,我们来讲点没有锁的,没错,人们在尝试达成正确的并发编程之前,做出了相当多的努力,Gary L. Peterson在1981年提出了Peterson算法,这是一个正确的算法,并且也意味着:很多人在这个算法前做的许多尝试大部分都是错的。
#1.访问意向
一个相当简单的并发实现思路是设定访问意向,例如对于A和B两个线程,如果都想访问一个临界区,可以设定一个意向变量,例如:
#include <iostream>
#include <thread>
using namespace std;
bool flags[2]{ false, false };
void A()
{
while (true) {
flags[0] = true;
while (flags[1]);
cout << "A" << endl;
flags[0] = false;
}
}
void B()
{
while (true) {
flags[1] = true;
while (flags[0]);
cout << "B" << endl;
flags[1] = false;
}
}
int main()
{
thread a{ A }, b{ B };
a.join();
b.join();
return 0;
}
跑跑看,死锁来得太快就像龙卷风~这个程序稍微打印一两行就会死锁,为什么呢,其实很简单,我们依旧可以构造一个死锁序列:A首先将flags[0]设定为true,然后此时直接打断,调度到B,此时B也将flags[1]设定为true,自此,无论之后谁被调度到,都会被卡在忙等待当中,也就是说:两个严格互相谦让的人在让一个梨,要是他们都不肯放弃自己的谦让,那么他们可以在这僵持一辈子,所以仅有访问意向的解决方案是错误的,它不能解决互斥访问的问题。
#2.严格轮换法
这个做法就有意思多了,它还真能实现互斥访问,但是它的办法很笨,我们来看看代码:
#include <iostream>
#include <thread>
using namespace std;
int nxt{ 0 };
void A()
{
while (true) {
while (nxt != 0);
cout << "A" << endl;
nxt = 1;
}
}
void B()
{
while (true) {
while (nxt != 1);
cout << "B" << endl;
nxt = 0;
}
}
int main()
{
thread a{ A }, b{ B };
a.join();
b.join();
return 0;
}
它可以非常好的完成两个进程之间的互斥,因为它严格保证了A的下一个操作一定是B,B的下一个操作一定是A,但是它本质也是一种忙等待,所以效率上来说,不会很高,并且对于更加复杂的问题,本质是你需要设计一个运行序列,这样一来其实就丢失了很大一部分的并发特性了
#3.Peterson算法的真实实现
Peterson算法的做法相当简单,它结合了访问意向和严格轮换两个方法,最终形成了一个正确的无锁互斥算法:
#include <iostream>
#include <thread>
#include <Windows.h>
using namespace std;
bool flags[2]{ false, false };
int turn;
int cnt = 0;
void T1()
{
while (true) {
flags[0] = true;
turn = 1;
while (flags[1] && turn == 1);
Sleep(800);
cout << "T1 access!cnt = " << cnt++ << "\n";
flags[0] = false;
}
}
void T2()
{
while (true) {
flags[1] = true;
turn = 0;
while (flags[0] && turn == 0);
cout << "T2 access!" << "\n";
flags[1] = false;
}
}
int main()
{
thread p1{ T1 }, p2{ T2 };
p1.join();
p2.join();
return 0;
}
它的核心内容主要在于这里(以T1为例):
bool flags[2]{ false, false };
int turn;
void T1()
{
while (true) {
flags[0] = true;
turn = 1;
while (flags[1] && turn == 1);
Sleep(800);
cout << "T1 access!cnt = " << cnt++ << "\n";
flags[0] = false;
}
}
Peterson算法的第一步是表明意向,将自己对应的意向标记置为true,而第二步则是最重要的,即抢座位,这一步是区别与严格轮换最重要的一步,因为严格轮换保证了顺序一定确定,而抢座位的过程则给这个过程补上了并发的特性,为什么这么说呢?因为这个turn本质上说也是一个临界区,我们不能确定两个线程谁能够让turn变成自己的编号。
而且这里还有一个设计上的巧思,在于每个线程向turn写入的值,这里有两种做法,都是正确的,第一种是向turn写入对方的值,因为临界区的写入是一个类似栈的结构,先写入的值会被后写入的覆盖,因此向turn写对方的值,如果T1线程比T2写得快,那么T2就会写入T1的值放入turn中,那么下一次就轮到T1运行了,这就是一种手快有,手慢无的设计思路。
另一种是向turn写入自己的值,而按照上面的思路,手快的会优先把自己的值写入turn,手慢的则能够优先得到执行,这个就是一种谦让的做法,这两者都是可以的,但是第一种的结果会更加符合认知一点。
#4.真的没问题吗?
我们做一个实验,这个实验需要在g++编译器的条件下进行:
#include <iostream>
#include <thread>
using namespace std;
bool flags[2]{ false, false };
int turn;
int sum{ 0 };
void T1()
{
flags[0] = true;
turn = 1;
while (flags[1] && turn == 1);
sum++;
flags[0] = false;
}
void T2()
{
flags[1] = true;
turn = 0;
while (flags[0] && turn == 0);
sum++;
flags[1] = false;
}
void thr1()
{
for (int i = 0; i < 1000000; i++) {
T1();
}
}
void thr2()
{
for (int i = 0; i < 1000000; i++) {
T2();
}
}
int main()
{
thread p1{ thr1 }, p2{ thr2 };
p1.join();
p2.join();
cout << sum << endl;
return 0;
}
编译的时候使用命令:
g++ peterson.cpp -o peterson -O1 -lpthread
看起来非常简单对吧,就是让两个线程做100w次加一操作,然后最后打印结果,然后你就会发现,这个打印的结果怎么都不对,无论编译多少次,运行多少次,都几乎得不到正确答案,这不对啊,Peterson算法不是一个正确的算法吗?
结果小于200w,肯定说明这两个线程存在同时进入临界区的情况,也就是说,在某个特定操作序列之下,Peterson算法会失效,对吗?当然不对,Peterson算法是经过证明的,它的正确性一定可以保证,这就只能说明,我们的代码在成为可执行文件之前出现了一点什么变化,所以才会有这个问题。
这个问题第一个最重要的因素在于代码乱序优化,即,指令的真实执行顺序与我们写的代码的顺序可能截然不同,但是大部分情况下我们是感受不到的,但是Peterson算法的指令顺序如果被交换,很有可能出现严重的问题:
flags[0] = true;
turn = 1;
while (flags[1] && turn == 1);
这三条指令保证了互斥访问,分别对应store、store和load,对于turn这个先store再load的变量,是绝对不能进行乱序的,因为这样一定会改变运行结果,但是flags[0]和flags[1]分属两个不同的变量,那么优化可能就会变成:先load flags[1],再store flags[0],两个线程恰好刚刚从临界区退出的时候,此时flags的两个变量均为false,如果此时首先load flags[1],线程1首先flags[1]为false,此时线程1的while循环条件为false,与此同时,线程2也把load提前,flags[0]也被读取为false,则二者此时全部满足了进入临界区的条件,于是就出现了严重的问题,因此,指令的乱序会导致Peterson算法出现严重的问题,我们需要加上内存屏障以避免load和store发生乱序,这条指令本来是需要使用汇编完成的,但是不同平台的汇编指令不同,为了可移植性,g++/gcc提供了__sync_synchronize(),因此我们可以把代码改成下面这样:
#include <iostream>
#include <thread>
using namespace std;
bool flags[2]{ false, false };
int turn;
int sum{ 0 };
void T1()
{
flags[0] = true;
turn = 1;
__sync_synchronize();
while (flags[1] && turn == 1);
sum++;
flags[0] = false;
}
void T2()
{
flags[1] = true;
turn = 0;
__sync_synchronize();
while (flags[0] && turn == 0);
sum++;
flags[1] = false;
}
void thr1()
{
for (int i = 0; i < 1000000; i++) {
T1();
}
}
void thr2()
{
for (int i = 0; i < 1000000; i++) {
T2();
}
}
int main()
{
thread p1{ thr1 }, p2{ thr2 };
p1.join();
p2.join();
cout << sum << endl;
return 0;
}
当你关掉优化编译,再运行可执行文件,就会发现结果已经对了,那为什么要关掉优化呢,因为哪怕你开的只是O1优化,最后的结果还是一样不对,所以还有更多问题。
第二个重要原因在于flags和turn上,比如还是对于这段代码:
flags[0] = true;
turn = 1;
while (flags[1] && turn == 1);
flags[1]是一个没有被写过的变量,它可能被存在寄存器中,因此从寄存器中直接读取是一个比读内存更快的选择,为了优化,这里的flags[1]可能就会是一个恰好没有被更新的值,turn也是一样,写入,再读取,如果纯粹从缓存/寄存器读取,另一个线程的turn可能就没有影响到当前线程了,因此这样一来,就会导致最终两个线程还是可以进入临界区,最后导致出错。
知道问题了,就好解决了,我们只需要一个关键字:volatile,volatile关键字用于修饰变量,要求在读取或写入前都必须从内存读取,而不是寄存器,这样就保证了上面的问题不会再出现,我们只需要将两个涉及并发访问的变量全部改成volatile修饰即可:
#include <iostream>
#include <thread>
using namespace std;
volatile bool flags[2]{ false, false };
volatile int turn;
int sum{ 0 };
void T1()
{
flags[0] = true;
turn = 1;
__sync_synchronize();
while (flags[1] && turn == 1);
sum++;
flags[0] = false;
}
void T2()
{
flags[1] = true;
turn = 0;
__sync_synchronize();
while (flags[0] && turn == 0);
sum++;
flags[1] = false;
}
void thr1()
{
for (int i = 0; i < 1000000; i++) {
T1();
}
}
void thr2()
{
for (int i = 0; i < 1000000; i++) {
T2();
}
}
int main()
{
thread p1{ thr1 }, p2{ thr2 };
p1.join();
p2.join();
cout << sum << endl;
return 0;
}
这串代码再编译,无论开几级优化,都是可以正常运行的了,真不容易啊,这其实体现出了一个关键问题:想要无锁实现正确的互斥,需要硬件级的并发支持,纯粹依靠软件要做到这件事情需要相当大的努力。
(9).信号量
说实话,信号量其实应该更早一点讲,因为它是一种古老但有效的互斥/同步措施,它有荷兰计算机科学家Dijkstra于1965年提出,没错,1965年;没错,Dijkstra,就是Dijkstra最短路算法的那个Dijkstra,信号量是一种实现相对复杂,但是效果很好的同步机制。
#1.信号量是什么
信号量是一种可以原子地加一或减一的计数变量,它能保证在计数变量为0如果再减一,则阻塞当前线程直到计数大于0。这是一个相当有用的机制,对于一个二元信号量,即最大为1的信号量,它可以充当独享的互斥锁,Dijkstra提出了信号量的P-V原语,P代表尝试使得信号量计数-1,若结果会导致其小于0,则阻塞当前线程直到信号量大于0后被唤醒;V代表使得信号量计数+1,更加详细的信号量细节可以查阅网络上的资料。
#2.标准库semaphore(C++20)
C++20引入了<semaphore>头文件,当然如果你用VS的话,貌似C++17也是可以用的,但这不重要,标准库的semaphore提供了两种信号量:
template< std::ptrdiff_t LeastMaxValue = /* 由实现定义 */ >
class counting_semaphore;
using binary_semaphore = std::counting_semaphore<1>;
其中二元信号量std::binary_semaphore实际上就是最小最大值为1的计数信号量,而计数信号量就是我们所说的信号量了,它有这些方法:
函数名 | 作用 |
---|---|
release | 增加内部计数器并解除获得者 |
acquire | 减少内部计数器或阻塞到直至能如此 |
try_acquire | 尝试减少内部计数器而不阻塞 |
try_acquire_for | 尝试减少内部计数器,至多阻塞一段时长 |
try_acquire_until | 尝试减少内部计数器,阻塞直至一个时间点 |
其中acquire对应P操作,release对应V操作,而下面的三个try则是类似std::mutex的try_lock一样的操作,因为P操作是会阻塞的,因此提供了非阻塞式的三个实现,这里就不过多介绍了(其实主要还是因为大家可能目前用的更多的是UNIX的semaphore.h)
#3.pthread.h和semaphore.h
我这一学期里发过的所有有关并发编程的内容实际上都是采用了pthread.h和semaphore.h两个库实现的,pthread就不多说了,这里简单提一下semaphore.h的用法。
semaphore.h当中定义的信号量类型为sem_t,因此你可以写出这样一串代码用于创建一个信号量:
#include <iostream>
#include <semaphore.h>
sem_t mutex;
int main()
{
sem_init(&mutex, 0, 5);
return 0;
}
在创建了变量之后,你需要调用sem_init函数对信号量进行初始化,第二个参数指示这个信号量是否在进程间共享,当为0时代表不共享,传入的第三个参数从信号量的角度来看,是它最大应当容纳的数量(你要是一直V操作它当然可以突破这个上限),需要注意的是,对于一个已经初始化过的sem_t调用sem_init是一种未定义行为,千万不要这么做。
而对应的P-V操作在semaphore.h中则定义为sem_wait和sem_post,即:
#include <iostream>
#include <semaphore.h>
sem_t mutex;
int main()
{
sem_init(&mutex, 5);
sem_wait(&mutex);
sem_post(&mutex);
return 0;
}
如果你不习惯,你可以把它用宏定义修改一下:
#define P(mutex) sem_wait(&mutex)
#define V(mutex) sem_post(&mutex)
很简单,对吧?
#4.经典同步/互斥问题
生产者—消费者问题,读者—写者问题,哲学家吃饭问题和理发师问题都是非常非常经典的同步/互斥问题。
在这里我就利用前面已经举过的例子,利用信号量进行一些改造:
#include <iostream>
#include <cstdlib>
#include <thread>
#include <semaphore.h>
#define P(mtx) sem_wait(&mtx)
#define V(mtx) sem_post(&mtx)
using namespace std;
int share{ 0 };
sem_t lk, full, em;
void init()
{
sem_init(&lk, 0, 1);
sem_init(&full, 0, 0);
sem_init(&em, 0, 1);
}
void Tproduce()
{
while (true) {
P(em);
P(lk);
share = rand() % 1000000;
cout << "Produce : " << share << endl;
V(lk);
V(full);
}
}
void Tconsume()
{
while (true) {
P(full);
P(lk);
cout << "Consume : " << share << endl;
V(lk);
V(em);
}
}
int main()
{
init();
thread p{ Tproduce }, c{ Tconsume };
p.join();
c.join();
return 0;
}
对于一个同步问题,需要一对信号量empty和full来完成,empty初始值表示初始能同时容纳的最大消费者数,full则为对应的最大生产者数,而剩下的一个lk则是互斥量,因为这里存在一个临界区share,因此需要使用lk来保护,特别需要注意的是,这里的full/empty和lk在P操作时顺序不能变,V操作时无所谓,你可以试试看这串代码:
#include <iostream>
#include <cstdlib>
#include <thread>
#include <semaphore.h>
#define P(mtx) sem_wait(&mtx)
#define V(mtx) sem_post(&mtx)
using namespace std;
int share{ 0 };
sem_t lk, full, em;
void init()
{
sem_init(&lk, 0, 1);
sem_init(&full, 0, 1);
sem_init(&em, 0, 0);
}
void Tproduce()
{
while (true) {
P(lk);
P(em);
share = rand() % 1000000;
cout << "Produce : " << share << endl;
V(lk);
V(full);
}
}
void Tconsume()
{
while (true) {
P(lk);
P(full);
cout << "Consume : " << share << endl;
V(lk);
V(em);
}
}
int main()
{
init();
thread p{ Tproduce }, c{ Tconsume };
p.join();
c.join();
return 0;
}
相信我,它很快就会死锁的,为什么呢,我们又可以思考出一个死锁序列:生产者线程首先获取lk,此时lk的计数为0,之后尝试获取em,这个例子中em初始值设定为0,则生产者线程阻塞,此时被打断,切换到消费者线程,但是消费者线程要首先获取lk,但是lk已经被生产者线程占据,因此两个线程就发生了死锁,这个例子中最严重的问题在于带锁睡眠,因此在设计使用信号量解决问题的时候,需要更加小心。
(10).条件变量
#1.条件变量是什么
信号量还是比较好用的,但是它有一个很明显的缺陷:你得给它设计好一套流程,从而避免死锁并保证同步,也就是说,我们需要让在某个信号量上等待的线程,在合适的时机被唤醒,让它继续执行指令,所以,有没有一种可能,我们直接以唤醒条件作为同步条件,一旦满足同步条件就唤醒对应的线程,这样是一种更符合直觉的同步原语呢?
条件变量帮我们解决了这个问题,条件变量绑定一个独享的互斥锁,提供了wait和notify等函数,当不满足条件时进行wait进入睡眠等待唤醒,而某个线程满足条件后调用notify_all唤醒所有在某个条件变量上等待的所有线程。
#2.std::condition_variable
C++的条件变量定义在<condition_variable>头文件中,我们在此主要使用std::condition_variable类完成所有的后续操作,它具备以下几个方法
函数名 | 作用 |
---|---|
notify_one | 通知一个等待的线程 |
notify_all | 通知所有等待的线程 |
wait | 阻塞当前线程,直到条件变量被唤醒 |
wait_for | 阻塞当前线程,直到条件变量被唤醒,或到指定时限时长后 |
wait_until | 阻塞当前线程,直到条件变量被唤醒,或直到抵达指定时间点 |
所以,我们可以利用条件变量对前面提到的生产者—消费者问题进行改造,这里的例子相较于之前的复杂了很多,我们的缓冲区改为了一个队列,并且生产者消费者的数量也发生了变化,不过其本质逻辑是不变的:
#include <iostream>
#include <cstdlib>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <vector>
using namespace std;
constexpr int MAXN{ 5 }, pnum{ 2 }, cnum{ 5 };
mutex mtx;
condition_variable cv;
queue<int> buffer;
vector<thread> pthr, cthr;
void Tproduce()
{
while (true) {
unique_lock lk(mtx);
if (buffer.size() < MAXN) {
buffer.push(rand() % 100000);
cout << "Produce : " << buffer.back() << endl;
}
else {
cv.notify_all();
}
}
}
void Tconsume() {
while (true) {
unique_lock lk(mtx);
if (!buffer.empty()) {
cout << "Consume : " << buffer.front() << endl;
buffer.pop();
}
else {
cv.wait(lk);
}
}
}
void init()
{
for (int i = 0; i < pnum; i++) {
pthr.push_back(thread{ Tproduce });
}
for (int i = 0; i < cnum; i++) {
cthr.push_back(thread{ Tconsume });
}
for (int i = 0; i < pnum; i++) {
pthr[i].join();
}
for (int i = 0; i < cnum; i++) {
cthr[i].join();
}
}
int main()
{
init();
return 0;
}
是不是非常优雅呢?只有等到条件成立,才进行操作,否则就唤醒或等待直到条件成立。
#3.线程安全队列的实现
最后一个部分是消息队列,众所周知,STL中的队列并不保证线程安全,因此如果直接使用std::queue完成消息队列的功能,就可能有两个线程拿到同一个front,pop同一个元素,这可就糟了:
#include <queue>
#include <mutex>
#include <condition_variable>
template <typename T>
class MessageQueue
{
private:
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable cv_;
public:
MessageQueue() = default;
~MessageQueue() = default;
void push(const T& item) {
{
std::unique_lock lock{ mutex_ };
queue_.push(item);
}
cv_.notify_all();
}
T pop() {
std::unique_lock lock{ mutex_ };
while (queue_.empty()) {
cv_.wait(lock);
}
T front = queue_.front();
queue_.pop();
return front;
}
};
只需要一点点相当简单的代码即可完成任务,对于push操作,只需要保证push操作的互斥即可,由于push后满足所有pop线程的条件,因此直接采用notify_all唤醒所有线程即可。而对于pop操作,需要保证队列不为空,因此在队列为空时直接等待唤醒即可,这样就完成了整个消息队列的构建。
我们可以用下面的代码简单测试一下:
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
template <typename T>
class MessageQueue
{
private:
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable cv_;
public:
MessageQueue() = default;
~MessageQueue() = default;
void push(const T& item) {
{
std::unique_lock lock{ mutex_ };
queue_.push(item);
}
cv_.notify_all();
}
T pop() {
std::unique_lock lock{ mutex_ };
while (queue_.empty()) {
cv_.wait(lock);
}
T front = queue_.front();
queue_.pop();
return front;
}
};
MessageQueue<int> mq;
void T()
{
for (int i = 0; i < 5; i++) {
mq.push(i);
}
for (int i = 0; i < 4; i++) {
std::cout << mq.pop() << std::endl;
}
}
int main()
{
std::thread t1{ T }, t2{ T }, t3{ T };
t1.join(), t2.join(), t3.join();
for (int i = 0; i < 3; i++) {
std::cout << mq.pop() << std::endl;
}
return 0;
}
最终结果应该会打印3组0~4,如果直接使用std::queue就有可能会产生数字丢失的问题。
小结
这算是C++这一个系列当中内容最多、范围最广并且难度最大的一章了,也如我前面所说的,这一章是从2023年9月份开始撰写的,一直到今天我才有了足够的知识储备完成整章的内容。
但是并发和多线程以及高性能C++的旅途还远未结束,本篇所介绍的所有内容完全只能算是起步,后续的博客中我可能还会介绍诸如<future>中的异步编程,基于C++20中的闩和屏障完成的更加复杂的同步,线程池等等内容,但是更新时间就不能保证了哈哈哈哈哈哈,希望大家能够有所收获吧。
参考资料
- 1.[哔哩哔哩] 并发控制:基础 (Peterson 算法、模型检验、原子操作)
- 2.[博客园] 内存栅栏(memory barrier):解救peterson算法的应用陷阱
- 3.[知乎] 对int变量赋值的操作是原子的吗?
- 4.[哔哩哔哩] 同步:生产者-消费者与条件变量 (算法并行化;万能同步方法)
- 5.[哔哩哔哩] 同步:信号量与哲♂学家吃饭问题 (信号量的正确打开方式)
- 6.[CSDN] C++ 多线程编程(二):pthread的基本使用
- 7.pthread_mutex_lock
- 8.Mutex lock for Linux Thread Synchronization
- 9.[哔哩哔哩] 并发控制:互斥 (问题定义与假设;自旋锁;互斥锁)
- 10.[Stack Overflow] stdout thread-safe in C on Linux?
- 11.[CSDN] C++ 多线程编程(二):pthread的基本使用
- 12.[CSDN] pthread 自旋锁使用详解
- 13.[GNU] GLIBC: Formatted Output Functions
- 14.[cppreference] 并发支持库
- 15.[CSDN] C++ 多线程学习(3) ---- 条件变量