1. 问题引入
在我们之前的生产者与消费者问题中, 在文章的最后, 我们曾尝试过把我们的代码封装成P()和V()操作, 结果却以失败告终. 归根结底是因为我们无法在不使用mutex的情况下来完成对临界区的互斥访问, 本篇文章我们就来探讨一下, 如何不使用mutex实现临界区的互斥访问.
2. 寻找线程安全的操作
我们封装P()和V()操作失败的根源在于我们封装出来的方法并不是原子操作. 它们是有可能被打断的, 也就是说它们不是线程安全的代码. 所以我们需要写出线程安全的代码才能实现对临界区的互斥访问. 哪些操作是线程安全的? 之前我们在读者与写者问题中提到过, 读取操作是线程安全的. 除此之外, 还有什么操作是线程安全的? 这里就不卖关子了, 如果一个操作是给整型变量赋值一个确定的值(例如int a = 1; ), 那么这个操作一般是原子的, 因此它也是线程安全的操作.
3. 临界区互斥访问的基本方法
1. 轮换法
我们仍然从最简单的情况说起, 考虑只有两个线程. 回想我们之前的互斥访问的操作, 线程在访问临界区之前, 先看一下锁与钥匙有没有被占用, 如果被占用, 该线程就进入等待状态; 如果没有被占用, 该线程就可以顺利访问临界区. 访问临界区之后, 归还锁与钥匙. 这里我们也可以用同样的思路, 线程访问临界区之前, 读取(这个操作是线程安全的)一个标志位trun, 如果这个标志位允许该线程进入临界区, 那么该线程就可以继续执行; 否则就让这个线程陷入死循环, 即线程进入等待状态. 线程从临界区出来以后, 改变这个标志位的值(这个操作也是线程安全的), 让另一个线程可以从死循环中出来. 比如有两个线程T0和T1, turn == 0时, T0可以进入临界区, turn==1时, T1可以进入临界区. 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 标志位, turn == i时, 线程i可以访问临界区
/// </summary>
int turn = 0;
/// <summary>
/// 临界区代码中的操作数
/// </summary>
int num = 0;
/// <summary>
/// 临界区代码
/// </summary>
void critical_region_fun(int index) {
printf("线程%d访问临界区, 操作数的值: %d\n", index, ++num);
}
/// <summary>
/// 线程0函数
/// </summary>
void thread_fun0() {
while (true) {
while (turn != 0) //没有轮到自己, 则陷入死循环
{
//陷入死循环, 线程进入等待状态
}
//访问临界区
critical_region_fun(0);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
turn = 1; //访问临界区的权限交给另外一个线程
}
}
/// <summary>
/// 线程1函数
/// </summary>
void thread_fun1() {
while (true) {
while (turn != 1) //没有轮到自己, 则陷入死循环
{
//陷入死循环, 线程进入等待状态
}
//访问临界区
critical_region_fun(1);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
turn = 0; //访问临界区的权限交给另外一个线程
}
}
int main()
{
std::thread thread0(thread_fun0); //线程0
std::thread thread1(thread_fun1); //线程1
thread0.join(); //等待线程结束
thread1.join();
return 0;
}
代码1: 轮换法(满足互斥访问, 不满足空闲让进, 不满足有限等待)
我们在临界区中实现了num的自增操作. 运行结果如下:
可以看出线程0与线程1轮流访问临界区, 这就是轮换法名称的由来. 关于轮换法是否可以实现互斥访问, 可以用反证法证明: 假设两个线程同时访问临界区, 则说明两个线程都跳出了死循环, 则两个while的条件均不满足, 即turn == 0 且turn == 1, 矛盾! 故假设不成立. 故两个线程不可能同时访问临界区. 因此轮换法可以保证互斥访问临界区. 那么这段代码还有别的问题吗?
我们还是考虑最简单的一种情况: 只有一个线程0, 会发生什么情况呢? 首先turn == 0, 跳过while循环, 然后访问临界区的代码, 然后将turn置为1. 此时由于只有一个线程0, 因此没有其它的线程将turn置为0, 因此线程0就会一直执行while的死循环语句, 再也无法跳出循环. 此时的临界区明明是空闲状态, 线程0却无法访问, 而且如果没有其它的线程, 线程0会这么一直等待下去. 这显然是不合理的.
下面我们就来总结一下实现临界区互斥访问的设计原则:
互斥访问: 如果有一个进程(线程)处在临界区中, 则其余进程(线程)不能进入. 互斥访问保证协作进程(线程)的正确运行;
空闲让进: 多个进程(线程)等待进入临界区时, 只要临界区为空, 应尽快使某一个进程(线程)进入, 空闲让进保证进程(线程)协作高效运行;
有限等待: 从进程(线程)发出请求到允许进入, 不能无线等待;
让权等待(非必须实现): 进程(线程)不能进入临界区时, 应立即释放处理器, 防止进程(线程)忙等待. 这个不是必须实现的. 比如上面的代码中, 线程0访问临界区时, 线程1一直执行死循环, 占用了处理器, 这个是允许的.
可以看出轮换法不满足空闲让进, 也不满足有限等待. 如果只有一个线程0, 且没有外界干预的情况下, 线程0执行完一轮之后, 就永远卡在这了.
2. 标记法
轮换法的问题在于, 只用一个turn来标记可以访问临界区的线程是不够的, 我们需要用多个标记来记录线程是否可以访问临界区, 只要其余线程无法访问临界区, 且当前线程可以访问临界区, 那么当前线程就可以进入临界区执行. 显然可以用一个bool类型的数组flag[ ] 来实现, flag[ i ] 为true代表线程 i 可以访问临界区; flag[ i ] 为false, 代表线程 i 无法访问临界区. 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 标志位数组. falg[i] == true时, 线程i可以访问临界区; falg[i] == false时, 线程i无法访问临界区
/// </summary>
bool flag[] = {false, false};
/// <summary>
/// 临界区代码中的操作数
/// </summary>
int num = 0;
/// <summary>
/// 临界区代码
/// </summary>
void critical_region_fun(int index) {
printf("线程%d访问临界区, 操作数的值: %d\n", index, ++num);
}
/// <summary>
/// 线程0函数
/// </summary>
void thread_fun0() {
while (true) {
flag[0] = true; //线程0需要访问临界区, 将flag[0]置为true
while (flag[1] == true) //没有轮到自己, 则陷入死循环
{
//陷入死循环, 线程进入等待状态
}
//访问临界区
critical_region_fun(0);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
flag[0] = false; //线程0已结束对临界区的访问, 交出临界区的访问权限.
}
}
/// <summary>
/// 线程1函数
/// </summary>
void thread_fun1() {
while (true) {
flag[1] = true; //线程1需要访问临界区, 将flag[1]置为true
while (flag[0] == true) //没有轮到自己, 则陷入死循环
{
//陷入死循环, 线程进入等待状态
}
//访问临界区
critical_region_fun(1);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
flag[1] = false; //线程1已结束对临界区的访问, 交出临界区的访问权限.
}
}
int main()
{
std::thread thread0(thread_fun0); //线程0
std::thread thread1(thread_fun1); //线程1
thread0.join(); //等待线程结束
thread1.join();
return 0;
}
代码2: 标志法(满足互斥访问, 不满足空闲让进, 不满足有限等待)
先看看这段代码能否保证互斥访问, 还是用反证法, 假设线程0和线程1都进入了临界区, 则两个线程均跳出了循环, 则有flag[0] == false且flag[1] == false; 又由两个线程均执行了flag[ i ] = true的操作, 则有flag[0] == true且flag[1] == true, 矛盾! 故假设不成立, 故标志法可以保证互斥访问. 再看只有线程0的情况下, 线程0能否正常运行. 易得flag[1]恒为false, 故线程0不会执行死循环, 故单个线程下可以正常运行. 再看是否满足空闲让进和有限等待. 假设有两个线程, 线程0执行了flag[0] = true; 然后发生调度, 执行线程1的代码flag[1] = true; 然后发生调度, 执行线程0, 此时flag[1] == true; 线程0陷入死循环, 然后发生调度, 执行线程1, 此时flag[0] == true, 线程1也陷入死循环. 此时临界区空闲, 两个线程却都进不来, 而且如果没有外界干预, 两个线程会无限地等待下去. 因此标志法不满足空闲让进, 不满足有限等待.
3. 皮特森(Peterson)算法
标志法的问题在于, 在判断flag[ i ] 是否为true时, 即使flag[ i ] 为true, 线程i也有可能还没进入临界区, 它甚至连while循环都没进入, 这就有可能导致两个线程都进入了死循环. 因此我们需要有一个标记, 这个标记需要保证两个线程至少有一个可以跳出while循环. 我们可以把轮换法中的turn拿过来, 在线程 i 进入临界区之前, 将turn置为 j , 在while循环的判断里面加上 && turn == j 或者 && turn == i; 这样turn就只有值可以取, 只有就可以保证两个while循环中至少有一个可以跳出. 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 标志位数组. falg[i] == true时, 线程i可以访问临界区; falg[i] == false时, 线程i无法访问临界区
/// </summary>
bool flag[] = {false, false};
/// <summary>
/// 标志位, turn == i时, 线程i可以访问临界区
/// </summary>
int turn = 0;
/// <summary>
/// 临界区代码中的操作数
/// </summary>
int num = 0;
/// <summary>
/// 临界区代码
/// </summary>
void critical_region_fun(int index) {
printf("线程%d访问临界区, 操作数的值: %d\n", index, ++num);
}
/// <summary>
/// 线程0函数
/// </summary>
void thread_fun0() {
while (true) {
flag[0] = true; //线程0需要访问临界区, 将flag[0]置为true
turn = 1; //线程1可以访问临界区
while (flag[1] == true && turn == 1) //没有轮到自己, 则陷入死循环
{
//陷入死循环, 线程进入等待状态
}
//访问临界区
critical_region_fun(0);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
flag[0] = false; //线程0已结束对临界区的访问, 交出临界区的访问权限.
}
}
/// <summary>
/// 线程1函数
/// </summary>
void thread_fun1() {
while (true) {
flag[1] = true; //线程1需要访问临界区, 将flag[1]置为true
turn = 0; //线程0可以访问临界区
while (flag[0] == true && turn == 0) //没有轮到自己, 则陷入死循环
{
//陷入死循环, 线程进入等待状态
}
//访问临界区
critical_region_fun(1);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
flag[1] = false; //线程1已结束对临界区的访问, 交出临界区的访问权限.
}
}
int main()
{
std::thread thread0(thread_fun0); //线程0
std::thread thread1(thread_fun1); //线程1
thread0.join(); //等待线程结束
thread1.join();
return 0;
}
代码3: 皮特森(Peterson)算法(满足互斥访问, 满足空闲让进, 满足有限等待)
先看皮特森算法是否满足互斥访问, 同样使用反证法. 假设两个线程都进入了临界区, 则falg[0]和flag[1]均为true, 且两个while循环都跳过了, 则有turn != 0 且turn != 1, 矛盾! 故假设不成立, 故该算法满足互斥访问.
再看是否满足空闲让进. 假设临界区空闲, 即没有线程要访问临界区, 则flag[0]和flag[1]均为false, 故两个while循环的判断条件均为false, 故两个while循环均可以跳出, 可以让新来的线程访问临界区, 因此满足空闲让进.
再看是否满足有限等待. 由于turn的取值只有0和1, 因此while循环的判断条件必有一个为false, 因此当两个线程访问临界区时, 必有一个线程 i 可以跳出while循环, 从而访问临界区, 而线程 j 则陷入死循环. 当线程 i 访问完临界区以后, 将flag[ i ]置为false, 则线程 j 的while循环条件不满足, 线程 j 就可以跳出死循环, 进而访问临界区了, 因此满足有限等待.