本篇文章我们使用C++探讨一下读者与写者问题.
1. 读者与写者问题引入
读写操作是计算机中对存储区最常见的两种操作. 我们之前探讨了生产者与消费者问题, 知道了如何开启多个线程, 现在就可以直接写出读者与写者问题的最基本的代码了, 仍从最简单的情况开始--两个读者, 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 要读取或写入的数据, 初始值为0
/// </summary>
int num = 0;
/// <summary>
/// 读者函数
/// </summary>
void reader_fun(int index) {
while (true)
{
printf("读者%d读取数据, 读取的数据: %d\n", index, num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
}
}
int main()
{
std::thread reader_thread1(reader_fun, 1); //读者线程1
std::thread reader_thread2(reader_fun, 2); //读者线程2
reader_thread1.join(); //等待线程结束
reader_thread2.join();
return 0;
}
代码1: 两个读者(没有对线程操作进行任何限制)
运行结果如下:
虽然我们没有对线程操作做任何的限制, 但是运行结果没有任何问题, 这是因为读操作是线程安全的, 因为读操作不会改变数据的值, 可以保证其它线程在读操作之后访问数据的时候, 数据仍是读操作之前的值. 所以读者线程可以被另一个读者线程打断, 即多个读者可以同时读.
那两个写者呢? 这里我们以++num作为对num的写入操作, 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 要读取或写入的数据, 初始值为0
/// </summary>
int num = 0;
/// <summary>
/// 写者函数
/// </summary>
void writer_fun(int index) {
while (true)
{
printf("写者%d写入数据, 写后的数据: %d\n", index, ++num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
}
}
int main()
{
std::thread writer_thread1(writer_fun, 1); //读者线程1
std::thread writer_thread2(writer_fun, 2); //读者线程2
writer_thread1.join(); //等待线程结束
writer_thread2.join();
return 0;
}
代码2: 两个写者(没有对线程操作进行任何限制)
运行结果如下:
不出所料, 运行结果出现了问题, 不过不慌, 我们已经探讨过了生产者与消费者问题, 这个问题和之前是一样的, 因为当前的写入操作(++num)不是原子操作, 在多线程的环境下它的执行顺序会被打乱. 可以看出, 写者的线程不能被写者线程打乱, 即写者在写的时候其它写者不能写. 解决方法也很简单, 加个锁即可, 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 要读取或写入的数据, 初始值为0
/// </summary>
int num = 0;
std::mutex writer_mtx; //互斥量, 即锁和钥匙
/// <summary>
/// 写者函数
/// </summary>
void writer_fun(int index) {
while (true)
{
writer_mtx.lock();
printf("写者%d写入数据, 写后的数据: %d\n", index, ++num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
writer_mtx.unlock();
}
}
int main()
{
std::thread writer_thread1(writer_fun, 1); //写者线程1
std::thread writer_thread2(writer_fun, 2); //写者线程2
writer_thread1.join(); //等待线程结束
writer_thread2.join();
return 0;
}
代码3: 两个写者(加了锁, 以解决互斥问题)
运行结果如下:
运行结果没有问题, 轻松解决, 现在我们再看一下一个读者一个写者的问题, 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 要读取或写入的数据, 初始值为0
/// </summary>
int num = 0;
/// <summary>
/// 读者函数
/// </summary>
void reader_fun() {
while (true)
{
printf("读者读取数据, 读取的数据: %d\n", num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
}
}
/// <summary>
/// 写者函数
/// </summary>
void writer_fun() {
while (true)
{
printf("写者写入数据, 写后的数据: %d\n", ++num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
}
}
int main()
{
std::thread reader_thread(reader_fun); //读者线程
std::thread writer_thread(writer_fun); //写者线程
reader_thread.join(); //等待线程结束
writer_thread.join();
return 0;
}
代码4: 一个读者, 一个写者(没有对线程操作进行任何限制)
这里我们先取消对写者的加锁操作, 看一下会不会有问题:
看来不加锁是会出现问题的, 这里写者写入后的数据是1, 但是读者读取的数据却是0, 断点之后会发现, 在读者执行printf代码时, 发生了调度, 转而执行了写者线程, 在写者线程执行完printf之后, 继续执行读者的printf代码, 此时printf语句中记录的num值仍然是num执行写者代码之前的值, 因此输出是0. 可见单纯的一句printf语句也不是原子操作, 它是可能被打断的. 看来读者进程不能被写者打断, 即读者在读的时候写者不能写, 在一个读者和一个写者的情况下, 读者的代码需要加锁. 把读者的代码加锁, 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 要读取或写入的数据, 初始值为0
/// </summary>
int num = 0;
std::mutex reader_mtx; //访问num的互斥量
/// <summary>
/// 读者函数
/// </summary>
void reader_fun() {
while (true)
{
reader_mtx.lock();
printf("读者读取数据, 读取的数据: %d\n", num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
reader_mtx.unlock();
}
}
/// <summary>
/// 写者函数
/// </summary>
void writer_fun() {
while (true)
{
printf("写者写入数据, 写后的数据: %d\n", ++num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
}
}
int main()
{
std::thread reader_thread(reader_fun); //读者线程
std::thread writer_thread(writer_fun); //写者线程
reader_thread.join(); //等待线程结束
writer_thread.join();
return 0;
}
代码5: 一个读者, 一个写者(只对读者加锁)
运行结果如下:
还是有问题, 读者先读, 读到的数据居然是1, 显然是写者在写的时候, 只执行了++num的操作但还没有输出, 就发生了调度, 转而去执行了读者线程的代码, 导致了读者线程读取的是num自增后的值, 然后又发生了调度, 去执行写者的代码, 输出写后的值为1. 看来写者进程也不能被打断, 即写者在写的时候读者不能读.
这也是很显然的, 就好像读者申请了锁和钥匙才能访问num, 而写者没有任何限制就可以直接闯入屋子对num进行操作, 这显然是不合理的. 因此我们也需要对写者的写操作进行上锁操作, 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 要读取或写入的数据, 初始值为0
/// </summary>
int num = 0;
std::mutex mtx; //访问num的互斥量
/// <summary>
/// 读者函数
/// </summary>
void reader_fun() {
while (true)
{
mtx.lock();
printf("读者读取数据, 读取的数据: %d\n", num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
mtx.unlock();
}
}
/// <summary>
/// 写者函数
/// </summary>
void writer_fun() {
while (true)
{
mtx.lock();
printf("写者写入数据, 写后的数据: %d\n", ++num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
mtx.unlock();
}
}
int main()
{
std::thread reader_thread(reader_fun); //读者线程
std::thread writer_thread(writer_fun); //写者线程
reader_thread.join(); //等待线程结束
writer_thread.join();
return 0;
}
代码6: 一个读者, 一个写者(读者和写者都加了锁)
这段代码运行起来没有逻辑上的错误, 读者可以自行尝试.
2. 读者优先
到这里你可能会觉得: 原来这就是读者与写者问题啊, 看上去并不是很难嘛, 我们只用了一个锁就解决了这个互斥访问的问题, 甚至都没有涉及到同步问题, 感觉还没有生产者与消费者问题难呢. 别急, 我们先看看我们之前的要求代码6是否满足, 我们的要求如下: ①多个读者可以同时读; ②写者在写的时候其它写者不能写; ③读者在读的时候写者不能写; ④写者在写的时候读者不能读. 显然②③④是代码6满足的, 那①呢? 代码6里面只有一个读者, 如果有多个读者的话, 第一个读者线程在读的时候会锁上屋子, 其它的读者就进不来了, 因此代码6不满足①. 看来我们又有新的问题要解决了.
解决问题的关键在于, 第一个读者拿到钥匙进入屋子之后到出屋子之前这段时间内, 不应该无脑地对所有的线程锁上屋子, 它可以拒绝写者线程进入屋子, 但它不应该拒绝读者线程进入屋子. 翻译成代码就是, 第一个读者执行了mtx.lock();以后, 在它出屋子之前, 即执行mtx.unlock();之前, 其它的读者都可以进入屋子. 在第一个读者读完之后, 它要出屋子的时候, 如果屋子里还有其它的读者线程, 那么第一个读者也不应该归还锁和钥匙的权限, 即不应该执行mtx.unlock(); 也就是说: 只要屋子里面有一个读者线程, 其它读者就可以随时进入屋子; 只要屋子里面有一个读者线程, 写者就不能进入屋子. 这就是读者优先的读者与写者问题.
有了思路我们就可以写代码了, 我们可以用一个字段reader_count来记录当前的读者线程的数量, 进屋子之前, 先++reader_count, 只要reader_count==1, 就申请锁和钥匙. 在出屋子前, 先--reader_count, 只要reader_count==0, 就归还锁和钥匙. 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 要读取或写入的数据, 初始值为0
/// </summary>
int num = 0;
/// <summary>
/// 读者的数量
/// </summary>
int reader_count = 0;
std::mutex mtx; //访问num的互斥量
/// <summary>
/// 读者函数
/// </summary>
void reader_fun(int index) {
while (true)
{
++reader_count;
if (reader_count == 1) {
mtx.lock();
}
printf("读者%d读取数据, 读取的数据: %d\n", index, num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
--reader_count;
if (reader_count == 0) {
mtx.unlock();
}
}
}
/// <summary>
/// 写者函数
/// </summary>
void writer_fun() {
while (true)
{
mtx.lock();
printf("写者 写入数据, 写后的数据: %d\n", ++num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
mtx.unlock();
}
}
int main()
{
std::thread reader_thread1(reader_fun, 1); //读者线程
std::thread reader_thread2(reader_fun, 2); //读者线程
std::thread writer_thread(writer_fun); //写者线程
reader_thread1.join(); //等待线程结束
reader_thread2.join();
writer_thread.join();
return 0;
}
代码7: 两个读者, 一个写者(用reader_count记录读者数量)
先不运行这段代码, 当我们看到多线程的代码中有对同一个变量的自增或者自减操作的时候, 我们应该已经敏感起来了. 这里有两个读者线程对reader_count进行操作, 我们却没有加以任何限制, 肯定是会出问题的, 不信可以运行一下看看:
果不其然, 写者写入的数据已经是4了, 读者读取的数据还是3, 其实上面两个读者线程读取的数据一直是0就已经出现问题了. 随便列举一种情况: 读者1执行++reader_count操作, reader_count为1, 然后发生调度, 转而读者2执行++reader_count操作, reader_count为2. 然后读者1执行第一个if语句, 不满足条件, 跳过; 然后读者2执行第一个if语句, 不满足条件, 跳过; 这样两个线程都没有执行mtx.lock(); 就进入屋子去读num的值了. 这时再来个写者线程, 这个写者线程就可以直接执行mtx.lock(); 拿到锁和钥匙的权限进入屋子里了, 这下读者还在读呢, 写者就进屋子了, 肯定是不允许的. 因此可以看出, 读者对readerCount的操作和if语句块是不能被打断执行的, 因此我们可以给它们也加上锁, 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 要读取或写入的数据, 初始值为0
/// </summary>
int num = 0;
/// <summary>
/// 读者的数量
/// </summary>
int reader_count = 0;
std::mutex mtx; //访问num的互斥量
std::mutex reader_count_mtx; //访问reader_count的互斥量, 即锁和钥匙
/// <summary>
/// 读者函数
/// </summary>
void reader_fun(int index) {
while (true)
{
reader_count_mtx.lock();
++reader_count;
if (reader_count == 1) {
mtx.lock();
}
reader_count_mtx.unlock();
printf("读者%d读取数据, 读取的数据: %d\n", index, num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
reader_count_mtx.lock();
--reader_count;
if (reader_count == 0) {
mtx.unlock();
}
reader_count_mtx.unlock();
}
}
/// <summary>
/// 写者函数
/// </summary>
void writer_fun() {
while (true)
{
mtx.lock();
printf("写者 写入数据, 写后的数据: %d\n", ++num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
mtx.unlock();
}
}
int main()
{
std::thread reader_thread1(reader_fun, 1); //读者线程
std::thread reader_thread2(reader_fun, 2); //读者线程
std::thread writer_thread(writer_fun); //写者线程
reader_thread1.join(); //等待线程结束
reader_thread2.join();
writer_thread.join();
return 0;
}
代码7: 两个读者, 一个写者(对reader_count的操作加上锁)
这段代码可以正常运行, 读者可以自行尝试. 需要注意的是, 运行结果可能是两个读者线程一直在读数据, 却一直没有调度到写者线程; 也可能是写者线程一直在写数据, 却一直没有调度到读者线程, 这是操作系统的资源分配的问题, 我们可以不用关心. 我们需要关心的是: 如果我们不只创建两个读者线程, 而是一直不停地创建读者线程, 那么就会导致屋子里面一直存在着读者线程, 那么锁与钥匙的权限就迟迟得不到归还, 就会导致写者线程饥饿甚至饿死. 我们称代码7为读者优先的读者写者问题.
事实上, 代码7运行一段时间之后会报错, 报错弹窗如下:
控制台报错内容: D:\a\_work\1\s\src\vctools\crt\github\stl\src\mutex.cpp(164): unlock of unowned mutex. 这个报错是读者线程中的mtx.unlock(); 这句代码导致的. 这个报错的原因是我们在线程a中执行了mtx.lock(); 操作, 却在线程b中执行了mtx.unlock(); 操作, 这是不被允许的. 读者可以在mtx.lock(); 和mtx.unlock(); 这两句代码之前打印出当前进程的索引, 会发现当打印出的两个索引不一样时就会出现上图中的报错. 因为lock()与unlock()操作在同一个线程中必须成对出现. 因此这里我们不能用mutex来管理对num的访问, 用条件变量(线程的管理队列)会更好一点. 思考一下, 线程管理队列需要管理哪些线程? 因为这个num是读写线程都需要访问的, 所以这个线程的管理队列对读者和写者线程都需要进行管理. 改写后的代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 要读取或写入的数据, 初始值为0
/// </summary>
int num = 0;
/// <summary>
/// 读者的数量
/// </summary>
int reader_count = 0;
/// <summary>
/// 互斥信号量, 初值为1
/// </summary>
int mutex = 1;
std::mutex mtx; //访问num的互斥量
std::mutex reader_count_mtx; //访问reader_count的互斥量, 即锁和钥匙
std::condition_variable reader_and_write_cv; //条件变量, 读者和写者线程的管理队列
/// <summary>
/// 读者函数
/// </summary>
void reader_fun(int index) {
while (true)
{
reader_count_mtx.lock();
++reader_count;
if (reader_count == 1) {
std::unique_lock<std::mutex> lock(mtx);
reader_and_write_cv.wait(lock, []() { return mutex == 1; }); //若mutex == 1, 则当前线程可以继续执行, 否则阻塞当前线程
mutex = 0; //占有锁与钥匙
}
reader_count_mtx.unlock();
printf("读者%d读取数据, 读取的数据: %d\n", index, num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
reader_count_mtx.lock();
--reader_count;
if (reader_count == 0) {
mutex = 1; //归还锁与钥匙
reader_and_write_cv.notify_all();
}
reader_count_mtx.unlock();
}
}
/// <summary>
/// 写者函数
/// </summary>
void writer_fun() {
while (true)
{
std::unique_lock<std::mutex> lock(mtx);
reader_and_write_cv.wait(lock, []() { return reader_count == 0 && mutex == 1; }); //若reader_count == 0 && mutex == 1, 则当前线程可以继续执行, 否则阻塞当前线程
mutex = 0; //占有锁与钥匙
printf("写者 写入数据, 写后的数据: %d\n", ++num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
mutex = 1; //归还锁与钥匙
reader_and_write_cv.notify_all();
}
}
int main()
{
std::thread reader_thread1(reader_fun, 1); //读者线程
std::thread reader_thread2(reader_fun, 2); //读者线程
std::thread writer_thread(writer_fun); //写者线程
reader_thread1.join(); //等待线程结束
reader_thread2.join();
writer_thread.join();
return 0;
}
代码8: 两个读者, 一个写者(用条件变量管理mutex)
第一个读者线程进入时, 如果mutex==1, 则可以继续执行, 其它读者线程也可以顺利访问num, 最后一个读者离开时, 则归还mutex的使用权, 并唤醒所有线程. 写者线程进入时, 如果mutex == 1 && readerCount == 0, 则可以继续执行, 否则阻塞.
最终的代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 要读取或写入的数据, 初始值为0
/// </summary>
int num = 0;
/// <summary>
/// 读者的数量
/// </summary>
int reader_count = 0;
/// <summary>
/// 互斥信号量, 初值为1
/// </summary>
int mutex = 1;
std::mutex mtx; //访问num的互斥量
std::mutex reader_count_mtx; //访问reader_count的互斥量, 即锁和钥匙
std::condition_variable reader_and_write_cv; //条件变量, 读者和写者线程的管理队列
/// <summary>
/// 读者函数
/// </summary>
void reader_fun(int index) {
while (true)
{
//P(reader_count_mtx)
reader_count_mtx.lock();
++reader_count;
if (reader_count == 1) {
//P(mtx)
std::unique_lock<std::mutex> lock(mtx);
reader_and_write_cv.wait(lock, []() { return mutex == 1; }); //若mutex == 1, 则当前线程可以继续执行, 否则阻塞当前线程
mutex = 0; //占有锁与钥匙
}
//V(reader_count_mtx)
reader_count_mtx.unlock();
//read()
printf("读者%d读取数据, 读取的数据: %d\n", index, num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
//P(reader_count_mtx)
reader_count_mtx.lock();
--reader_count;
if (reader_count == 0) {
//V(mtx)
mutex = 1; //归还锁与钥匙
reader_and_write_cv.notify_all();
}
//V(reader_count_mtx)
reader_count_mtx.unlock();
}
}
/// <summary>
/// 写者函数
/// </summary>
void writer_fun() {
while (true)
{
//P(mtx)
std::unique_lock<std::mutex> lock(mtx);
reader_and_write_cv.wait(lock, []() { return reader_count == 0 && mutex == 1; }); //若reader_count == 0 && mutex == 1, 则当前线程可以继续执行, 否则阻塞当前线程
mutex = 0; //占有锁与钥匙
//write()
printf("写者 写入数据, 写后的数据: %d\n", ++num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
//V(mtx)
mutex = 1; //归还锁与钥匙
reader_and_write_cv.notify_all();
}
}
int main()
{
std::thread reader_thread1(reader_fun, 1); //读者线程
std::thread reader_thread2(reader_fun, 2); //读者线程
std::thread writer_thread(writer_fun); //写者线程
reader_thread1.join(); //等待线程结束
reader_thread2.join();
writer_thread.join();
return 0;
}
代码9: 读者写者问题(读者优先)
3. 写者优先
读者优先的问题在哪里? 问题在于读者的优先权太高了, 只要屋子里有读者, 后面的读者就可以进入屋子, 哪怕屋子外面写者先来, 后来的读者也可以无视写者的存在, "插队"进入屋子. 现在我们将这个规则改一下: 如果一个读者申请进行读操作时已经有另一个写者在等待写, 则该读者必须等待到没有写者处于等待状态后才能进行读操作(也即读者必须等待所有的写者写完才能进行读取操作). 这就是写者优先的读者写者问题.
怎么实现写者优先? 看我们的要求, 想象这样的场景, 屋子外面是读者和写者在排队, 我们只关心队列的第一个位置, 这个位置比较特殊, 如果这个位置上是读者, 那么它就只能容纳一个人; 如果这个位置上是写者, 那么后来的所有写者都可以无视前面的读者, 直接来到这个第一个位置. 这个特殊的第一个位置我们称之为等待位.
我们还是先从简单的情况说起, 假设等待位没有那么特殊, 无论是写者还是读者, 这个等待位都只能容纳一个人, 那么这个等待位和屋子一样, 是个临界资源, 只能互斥访问. 只不过读者和写者到达等待位之后, 没有需要执行的逻辑代码, 每个线程在进入屋子之前, 都要先申请这个等待位. 然后进屋子时, 离开等待位, 需要归还这个等待位. 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 要读取或写入的数据, 初始值为0
/// </summary>
int num = 0;
/// <summary>
/// 读者的数量
/// </summary>
int reader_count = 0;
std::mutex mtx; //访问num的互斥量
std::mutex reader_count_mtx; //访问reader_count的互斥量
std::mutex wait_pos_mtx; //访问等待位的互斥量
/// <summary>
/// 读者函数
/// </summary>
void reader_fun(int index) {
while (true)
{
wait_pos_mtx.lock(); //申请等待位
reader_count_mtx.lock();
++reader_count;
if (reader_count == 1) {
mtx.lock();
}
reader_count_mtx.unlock(); //归还等待位
wait_pos_mtx.unlock();
printf("读者%d读取数据, 读取的数据: %d\n", index, num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
reader_count_mtx.lock();
--reader_count;
if (reader_count == 0) {
mtx.unlock();
}
reader_count_mtx.unlock();
}
}
/// <summary>
/// 写者函数
/// </summary>
void writer_fun(int index) {
while (true)
{
wait_pos_mtx.lock(); //申请等待位
wait_pos_mtx.unlock(); //归还等待位
mtx.lock();
printf("写者%d写入数据, 写后的数据: %d\n", index, ++num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
mtx.unlock();
}
}
int main()
{
std::thread reader_thread1(reader_fun, 1); //读者线程
std::thread reader_thread2(reader_fun, 2); //读者线程
std::thread writer_thread1(writer_fun, 1); //写者线程
std::thread writer_thread2(writer_fun, 2); //写者线程
reader_thread1.join(); //等待线程结束
reader_thread2.join();
writer_thread1.join();
writer_thread2.join();
return 0;
}
代码10: 两个读者, 两个写者(将等待位看作临界资源)
这里我们暂时先不处理不同的线程调用lock()与unlock()导致报错的问题, 这段代码符合我们的需求吗? 我们试想一下, 如果写者进屋子时归还等待位, 那么写者1进入屋子, 归还等待位, 后来的读者1就可以申请等待位了, 这时写者1还没写完, 又来了一个写者2, 它却无法申请等待位, 因为等待位被读者1申请了, 这就导致所有的写者还没写完读者就可以进行读取操作了, 这不符合我们的需求. 所以写者进屋子时不需要归还等待位. 那什么时候归还等待位呢, 显然是所有的写者都写完之后才归还等待位, 所以我们还需要记录写者的数量writer_count, 在第一个写者进屋屋子之前, 申请等待位; 在最后一个写者离开屋子之后, 归还等待位. 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 要读取或写入的数据, 初始值为0
/// </summary>
int num = 0;
/// <summary>
/// 读者的数量
/// </summary>
int reader_count = 0;
/// <summary>
/// 写者的数量
/// </summary>
int writer_count = 0;
std::mutex mtx; //访问num的互斥量
std::mutex reader_count_mtx; //访问reader_count的互斥量
std::mutex wait_pos_mtx; //访问等待位的互斥量
/// <summary>
/// 读者函数
/// </summary>
void reader_fun(int index) {
while (true)
{
wait_pos_mtx.lock(); //申请等待位
reader_count_mtx.lock();
++reader_count;
if (reader_count == 1) {
mtx.lock();
}
reader_count_mtx.unlock();
wait_pos_mtx.unlock(); //归还等待位
printf("读者%d读取数据, 读取的数据: %d\n", index, num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
reader_count_mtx.lock();
--reader_count;
if (reader_count == 0) {
mtx.unlock();
}
reader_count_mtx.unlock();
}
}
/// <summary>
/// 写者函数
/// </summary>
void writer_fun(int index) {
while (true)
{
++writer_count;
if (writer_count == 1) {
wait_pos_mtx.lock(); //申请等待位
}
mtx.lock();
printf("写者%d写入数据, 写后的数据: %d\n", index, ++num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
mtx.unlock();
--writer_count;
if (writer_count == 0) {
wait_pos_mtx.unlock(); //归还等待位
}
}
}
int main()
{
std::thread reader_thread1(reader_fun, 1); //读者线程
std::thread reader_thread2(reader_fun, 2); //读者线程
std::thread writer_thread1(writer_fun, 1); //写者线程
std::thread writer_thread2(writer_fun, 2); //写者线程
reader_thread1.join(); //等待线程结束
reader_thread2.join();
writer_thread1.join();
writer_thread2.join();
return 0;
}
代码11: 两个读者, 两个写者(用writer_count记录写者数量)
这段代码不用运行想必就能料到它是有问题的, 因为我们对writer_count的操作没有加锁, 肯定是会出问题的, 我们之前已经遇到过了, 所以我们需要给对writer_count的操作加锁, 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 要读取或写入的数据, 初始值为0
/// </summary>
int num = 0;
/// <summary>
/// 读者的数量
/// </summary>
int reader_count = 0;
/// <summary>
/// 写者的数量
/// </summary>
int writer_count = 0;
std::mutex mtx; //访问num的互斥量
std::mutex reader_count_mtx; //访问reader_count的互斥量
std::mutex wait_pos_mtx; //访问等待位的互斥量
std::mutex writer_count_mtx; //访问writer_count的互斥量
/// <summary>
/// 读者函数
/// </summary>
void reader_fun(int index) {
while (true)
{
wait_pos_mtx.lock(); //申请等待位
reader_count_mtx.lock();
++reader_count;
if (reader_count == 1) {
mtx.lock();
}
reader_count_mtx.unlock();
wait_pos_mtx.unlock(); //归还等待位
printf("读者%d读取数据, 读取的数据: %d\n", index, num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
reader_count_mtx.lock();
--reader_count;
if (reader_count == 0) {
mtx.unlock();
}
reader_count_mtx.unlock();
}
}
/// <summary>
/// 写者函数
/// </summary>
void writer_fun(int index) {
while (true)
{
writer_count_mtx.lock();
++writer_count;
if (writer_count == 1) {
wait_pos_mtx.lock(); //申请等待位
}
writer_count_mtx.unlock();
mtx.lock();
printf("写者%d写入数据, 写后的数据: %d\n", index, ++num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
mtx.unlock();
writer_count_mtx.lock();
--writer_count;
if (writer_count == 0) {
wait_pos_mtx.unlock(); //归还等待位
}
writer_count_mtx.unlock();
}
}
int main()
{
std::thread reader_thread1(reader_fun, 1); //读者线程
std::thread reader_thread2(reader_fun, 2); //读者线程
std::thread writer_thread1(writer_fun, 1); //写者线程
std::thread writer_thread2(writer_fun, 2); //写者线程
reader_thread1.join(); //等待线程结束
reader_thread2.join();
writer_thread1.join();
writer_thread2.join();
return 0;
}
代码12: 两个读者, 两个写者(给对writer_count的操作加锁)
将wait_pos_mtx.lock(); 与wait_pos_mtx.unlock(); 和mtx.lock(); 与mtx.unlock(); 的代码改写为条件变量的形式, 避免运行报错, 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 要读取或写入的数据, 初始值为0
/// </summary>
int num = 0;
/// <summary>
/// 读者的数量
/// </summary>
int reader_count = 0;
/// <summary>
/// 写者的数量
/// </summary>
int writer_count = 0;
/// <summary>
/// 访问num的互斥信号量, 初值为1
/// </summary>
int mutex = 1;
/// <summary>
/// 访问等待位的互斥信号量, 初值为1
/// </summary>
int wait_pos_mutex = 1;
std::mutex mtx; //访问num的互斥量
std::mutex reader_count_mtx; //访问reader_count的互斥量
std::mutex wait_pos_mtx; //访问等待位的互斥量
std::mutex writer_count_mtx; //访问writer_count的互斥量
std::condition_variable reader_and_write_cv; //条件变量, 读者和写者线程的管理队列
/// <summary>
/// 读者函数
/// </summary>
void reader_fun(int index) {
while (true)
{
std::unique_lock<std::mutex> wait_pos_lock(wait_pos_mtx);
reader_and_write_cv.wait(wait_pos_lock, []() { return writer_count == 0 && wait_pos_mutex == 1; }); //若写者数量为0且等待位数量为1, 则当前线程可以继续执行
wait_pos_mutex = 0; //申请等待位
reader_count_mtx.lock();
++reader_count;
if (reader_count == 1) {
std::unique_lock<std::mutex> lock(mtx);
reader_and_write_cv.wait(lock, []() { return mutex == 1; }); //若mutex为1, 则当前线程可以继续执行
mutex = 0; //占有锁与钥匙
}
reader_count_mtx.unlock();
wait_pos_mutex = 1;
reader_and_write_cv.notify_all(); //归还等待位
printf("读者%d读取数据, 读取的数据: %d\n", index, num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
reader_count_mtx.lock();
--reader_count;
if (reader_count == 0) {
mutex = 1; //归还锁与钥匙
reader_and_write_cv.notify_all();
}
reader_count_mtx.unlock();
}
}
/// <summary>
/// 写者函数
/// </summary>
void writer_fun(int index) {
while (true)
{
writer_count_mtx.lock();
++writer_count;
if (writer_count == 1) {
std::unique_lock<std::mutex> wait_pos_lock(wait_pos_mtx);
reader_and_write_cv.wait(wait_pos_lock, []() { return wait_pos_mutex == 1; }); //若等待位数量为1, 则当前线程可以继续执行
wait_pos_mutex = 0; //申请等待位
}
writer_count_mtx.unlock();
std::unique_lock<std::mutex> lock(mtx);
reader_and_write_cv.wait(lock, []() { return reader_count == 0 && mutex == 1; }); //若mutex为1, 则当前线程可以继续执行
mutex = 0; //占有锁与钥匙
printf("写者%d写入数据, 写后的数据: %d\n", index, ++num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
mutex = 1; //归还锁与钥匙
reader_and_write_cv.notify_all();
writer_count_mtx.lock();
--writer_count;
if (writer_count == 0) {
wait_pos_mutex = 1; //归还等待位
reader_and_write_cv.notify_all();
}
writer_count_mtx.unlock();
}
}
int main()
{
std::thread reader_thread1(reader_fun, 1); //读者线程
std::thread reader_thread2(reader_fun, 2); //读者线程
std::thread writer_thread1(writer_fun, 1); //写者线程
std::thread writer_thread2(writer_fun, 2); //写者线程
reader_thread1.join(); //等待线程结束
reader_thread2.join();
writer_thread1.join();
writer_thread2.join();
return 0;
}
代码13: 两个读者, 两个写者(引入条件变量避免运行报错)
最终代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 要读取或写入的数据, 初始值为0
/// </summary>
int num = 0;
/// <summary>
/// 读者的数量
/// </summary>
int reader_count = 0;
/// <summary>
/// 写者的数量
/// </summary>
int writer_count = 0;
/// <summary>
/// 访问num的互斥信号量, 初值为1
/// </summary>
int mutex = 1;
/// <summary>
/// 访问等待位的互斥信号量, 初值为1
/// </summary>
int wait_pos_mutex = 1;
std::mutex mtx; //访问num的互斥量
std::mutex reader_count_mtx; //访问reader_count的互斥量
std::mutex wait_pos_mtx; //访问等待位的互斥量
std::mutex writer_count_mtx; //访问writer_count的互斥量
std::condition_variable reader_and_write_cv; //条件变量, 读者和写者线程的管理队列
/// <summary>
/// 读者函数
/// </summary>
void reader_fun(int index) {
while (true)
{
//P(wait_pos_mtx)
std::unique_lock<std::mutex> wait_pos_lock(wait_pos_mtx);
reader_and_write_cv.wait(wait_pos_lock, []() { return writer_count == 0 && wait_pos_mutex == 1; }); //若写者数量为0且等待位数量为1, 则当前线程可以继续执行
wait_pos_mutex = 0; //申请等待位
//P(reader_count_mtx)
reader_count_mtx.lock();
++reader_count;
if (reader_count == 1) {
//P(mtx)
std::unique_lock<std::mutex> lock(mtx);
reader_and_write_cv.wait(lock, []() { return mutex == 1; }); //若mutex为1, 则当前线程可以继续执行
mutex = 0; //占有锁与钥匙
}
//V(reader_count_mtx)
reader_count_mtx.unlock();
//V(wait_pos_mtx)
wait_pos_mutex = 1;
reader_and_write_cv.notify_all(); //归还等待位
//read()
printf("读者%d读取数据, 读取的数据: %d\n", index, num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
//P(reader_count_mtx)
reader_count_mtx.lock();
--reader_count;
if (reader_count == 0) {
//V(mtx)
mutex = 1; //归还锁与钥匙
reader_and_write_cv.notify_all();
}
//V(reader_count_mtx)
reader_count_mtx.unlock();
}
}
/// <summary>
/// 写者函数
/// </summary>
void writer_fun(int index) {
while (true)
{
//P(writer_count_mtx)
writer_count_mtx.lock();
++writer_count;
if (writer_count == 1) {
//P(wait_pos_mutex)
std::unique_lock<std::mutex> wait_pos_lock(wait_pos_mtx);
reader_and_write_cv.wait(wait_pos_lock, []() { return wait_pos_mutex == 1; }); //若等待位数量为1, 则当前线程可以继续执行
wait_pos_mutex = 0; //申请等待位
}
//V(writer_count_mtx)
writer_count_mtx.unlock();
//P(mtx)
std::unique_lock<std::mutex> lock(mtx);
reader_and_write_cv.wait(lock, []() { return reader_count == 0 && mutex == 1; }); //若mutex为1, 则当前线程可以继续执行
mutex = 0; //占有锁与钥匙
//write()
printf("写者%d写入数据, 写后的数据: %d\n", index, ++num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
//V(mtx)
mutex = 1; //归还锁与钥匙
reader_and_write_cv.notify_all();
//P(writer_count_mtx)
writer_count_mtx.lock();
--writer_count;
if (writer_count == 0) {
//V(wait_pos_mutex)
wait_pos_mutex = 1; //归还等待位
reader_and_write_cv.notify_all();
}
//V(writer_count_mtx)
writer_count_mtx.unlock();
}
}
int main()
{
std::thread reader_thread1(reader_fun, 1); //读者线程
std::thread reader_thread2(reader_fun, 2); //读者线程
std::thread writer_thread1(writer_fun, 1); //写者线程
std::thread writer_thread2(writer_fun, 2); //写者线程
reader_thread1.join(); //等待线程结束
reader_thread2.join();
writer_thread1.join();
writer_thread2.join();
return 0;
}
代码14: 读者写者问题(写者优先)
4. 读写公平
写者优先的问题在哪里? 问题在于写者的优先权太高了, 只要屋子里有写者, 屋子外面后来的写者就可以无视前面的读者, 进入等待位. 这也不太合理. 我们希望读者写者的地位是完全平等的, 要做到这点, 我们就需要对等待位加以限制, 它不应该无条件地接纳写者, 而是应该被写者和读者公平地访问. 所以在写者申请等待位的时候, 不用判断写者的数量, 直接把等待位当作普通的临界资源来进行互斥访问. 因此我们可以直接将写者优先中的代码稍作改写, 去掉对writer_count的判断, 即可作为读写公平的代码. 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 要读取或写入的数据, 初始值为0
/// </summary>
int num = 0;
/// <summary>
/// 读者的数量
/// </summary>
int reader_count = 0;
/// <summary>
/// 访问num的互斥信号量, 初值为1
/// </summary>
int mutex = 1;
/// <summary>
/// 访问等待位的互斥信号量, 初值为1
/// </summary>
int wait_pos_mutex = 1;
std::mutex mtx; //访问num的互斥量
std::mutex reader_count_mtx; //访问reader_count的互斥量
std::mutex wait_pos_mtx; //访问等待位的互斥量
std::condition_variable reader_and_write_cv; //条件变量, 读者和写者线程的管理队列
/// <summary>
/// 读者函数
/// </summary>
void reader_fun(int index) {
while (true)
{
//P(wait_pos_mtx)
std::unique_lock<std::mutex> wait_pos_lock(wait_pos_mtx);
reader_and_write_cv.wait(wait_pos_lock, []() { return wait_pos_mutex == 1; }); //若写者数量为0且等待位数量为1, 则当前线程可以继续执行
wait_pos_mutex = 0; //申请等待位
//P(reader_count_mtx)
reader_count_mtx.lock();
++reader_count;
if (reader_count == 1) {
//P(mtx)
std::unique_lock<std::mutex> lock(mtx);
reader_and_write_cv.wait(lock, []() { return mutex == 1; }); //若mutex为1, 则当前线程可以继续执行
mutex = 0; //占有锁与钥匙
}
//V(reader_count_mtx)
reader_count_mtx.unlock();
//V(wait_pos_mtx)
wait_pos_mutex = 1;
reader_and_write_cv.notify_all(); //归还等待位
//read()
printf("读者%d读取数据, 读取的数据: %d\n", index, num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
//P(reader_count_mtx)
reader_count_mtx.lock();
--reader_count;
if (reader_count == 0) {
//V(mtx)
mutex = 1; //归还锁与钥匙
reader_and_write_cv.notify_all();
}
//V(reader_count_mtx)
reader_count_mtx.unlock();
}
}
/// <summary>
/// 写者函数
/// </summary>
void writer_fun(int index) {
while (true)
{
//P(wait_pos_mutex)
std::unique_lock<std::mutex> wait_pos_lock(wait_pos_mtx);
reader_and_write_cv.wait(wait_pos_lock, []() { return wait_pos_mutex == 1; }); //若等待位数量为1, 则当前线程可以继续执行
wait_pos_mutex = 0; //申请等待位
//P(mtx)
std::unique_lock<std::mutex> lock(mtx);
reader_and_write_cv.wait(lock, []() { return reader_count == 0 && mutex == 1; }); //若mutex为1, 则当前线程可以继续执行
mutex = 0; //占有锁与钥匙
//write()
printf("写者%d写入数据, 写后的数据: %d\n", index, ++num);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
//V(mtx)
mutex = 1; //归还锁与钥匙
reader_and_write_cv.notify_all();
//V(wait_pos_mutex)
wait_pos_mutex = 1; //归还等待位
reader_and_write_cv.notify_all();
}
}
int main()
{
std::thread reader_thread1(reader_fun, 1); //读者线程
std::thread reader_thread2(reader_fun, 2); //读者线程
std::thread writer_thread1(writer_fun, 1); //写者线程
std::thread writer_thread2(writer_fun, 2); //写者线程
reader_thread1.join(); //等待线程结束
reader_thread2.join();
writer_thread1.join();
writer_thread2.join();
return 0;
}
代码15: 读者写者问题(读写公平)