C++11多线程编程 一:多线程概述
C++11多线程编程 二:多线程通信,同步,锁
C++11多线程编程 三:锁资源管理和条件变量
3.1 手动实现RAii管理mutex资源锁自动释放
自己写的代码一般都是自己上锁,自己进行释放,但是这样特别容易忘记释放,进而造成死锁(可以理解为一直没有被释放的锁),为了确保每一个锁都能得到释放,所以就有了这样一个技术,叫做RAii资源获取既初始化,特性就是使用局部对象来管理资源,局部对象我们都知道,他一旦出栈就会得到释放,在一对大括号之中的代码,里面生成的变量和对象,一旦出了这对大括号,他就会得到释放,若是类的对象的话,他会调用析构函数进行释放,这部分代码我们称为是在栈中生成的空间,所以他们的运行周期由操作系统来维护。
使用局部对象来管理资源的技术称为资源获取即初始化;它的生命周期是由操作系统来管理的,无需人工介入;资源的销毁容易忘记,造成死锁或内存泄漏。
#include <thread>
#include <iostream>
#include <string>
#include <mutex>
#include <shared_mutex>
//Linux -lpthread
using namespace std;
// RAII 这个是把上锁和解锁的过程放在类当中
class XMutex
{
public:
XMutex(mutex& mux) :mux_(mux)
{
cout << "Lock" << endl;
mux.lock();
}
~XMutex()
{
cout << "Unlock" << endl;
mux_.unlock();
}
private:
mutex& mux_;
};
static mutex mux;
void TestMutex(int status)
{
XMutex lock(mux);
if (status == 1)
{
cout << "=1" << endl;
return;
}
else
{
cout << "!=1" << endl;
return;
}
}
int main(int argc, char* argv[])
{
TestMutex(1);
TestMutex(2);
getchar();
return 0;
}
类名后面跟着一个lock(mux),lock其实是一个对象,并且给有参构造函数传递了一个实参。
3.2 c++11自带的RAIl控制锁lock guard
#include <thread>
#include <iostream>
#include <string>
#include <mutex>
#include <shared_mutex>
//Linux -lpthread
using namespace std;
static mutex gmutex; //这里可以是任意的互斥变量类型,只要里面包含lock函数的都可以
void TestLockGuard(int i)
{
gmutex.lock();
{
//假如外部已经有锁,那就不lock
lock_guard<mutex> lock(gmutex, adopt_lock);
//出了大括号,调用析构函数,释放锁
}
{
lock_guard<mutex> lock(gmutex);
cout << "begin thread " << i << endl;
}
for (;;)
{
{
lock_guard<mutex> lock(gmutex);
cout << "In " << i << endl;
}
this_thread::sleep_for(500ms);
}
}
int main(int argc, char* argv[])
{
for (int i = 0; i < 3; i++)
{
thread th(TestLockGuard, i + 1);
th.detach();
}
getchar();
return 0;
}
3.3 unique_lock 可临时解锁控制超时的互斥体
在实际需求当中会出现几种情况,也就是可能会出现移动赋值,也就是把一个锁赋值给另一个锁,可能出现在对象的赋值的过程当中,若你想要支持这种转移,那就要使用unique_lock,这是另一种锁管理的工具,前面我们是使用大括号来控制的,如果在代码中,由于业务逻辑的需要,需要先解锁,后面又要再加锁,可以使用提供的接口进行手动解锁,然后再加锁,最后由析构函数进行释放锁,同时还支持更加复杂的情况,
unique_lock C++11 实现可移动的互斥体所有权包装器
支持临时释放锁 unlock
支持 adopt_lock(已经拥有锁,不加锁,出栈区会释放)
支持 defer_lock(延后拥有,不加锁,出栈区不释放)
支持 try_to_lock 尝试获得互斥的所有权而不阻塞 ,获取失败退出栈区不会释放,通过owns_lock()函数判断
支持超时参数,超时不拥有锁
而后面释放锁资源的时候,会先判断是否拥有锁资源,代码如下:
尝试加锁的源码:
由红方框里面可知,try_lock()函数会返回TRUE或者FALSE,所以就变成和上面两个一样的了。
#include <thread>
#include <iostream>
#include <string>
#include <mutex>
#include <shared_mutex>
//Linux -lpthread
using namespace std;
int main(int argc, char* argv[])
{
{
static mutex mux;
{
unique_lock<mutex> lock(mux);//创建lock对象的时候就已经加锁了
lock.unlock();//可以临时释放锁
lock.lock(); //也可以临时加锁
}
{
//已经拥有锁 就不再锁定了,退出栈区解锁
mux.lock();
unique_lock<mutex> lock(mux, adopt_lock);
}
{
//延后加锁 不拥有 退出栈区不解锁
unique_lock<mutex> lock(mux, defer_lock);
//后面需要我们主动的去加锁 退出栈区解锁
lock.lock();
}
{
//mux.lock();
//尝试加锁 不阻塞 失败不拥有锁(退出栈区不解锁) 成功的话就拥有锁
unique_lock<mutex> lock(mux, try_to_lock);
if (lock.owns_lock())
{
cout << "owns_lock" << endl;
}
else
{
cout << "not owns_lock" << endl;
}
}
}
return 0;
}
3.4 C++14 shared lock共享锁包装器
#include <thread>
#include <iostream>
#include <string>
#include <mutex>
#include <shared_mutex>
//Linux -lpthread
using namespace std;
int main(int argc, char* argv[])
{
{
//共享锁
static shared_timed_mutex tmux;
//读取锁 共享锁
{
shared_lock<shared_timed_mutex> lock(tmux);//这行代码调用共享锁
cout << "read data" << endl;
//退出栈区 释放共享锁
}
//写入锁 互斥锁
{
unique_lock<shared_timed_mutex> lock(tmux);
cout << "write data" << endl;
}
}
getchar();
return 0;
}
3.5 c++17 scoped_lock 解决互锁造成的死锁
这个封装器自在C++17中支持,记得设置C++17
死锁问题:如下图所示,线程 A 持有资源 2,线程 B 持有资源 1,在各自的锁没有解开的情况下,他们同时申请对方的资源,由于得不到锁资源,这两个线程就会互相等待而进入死锁状态。
模拟死锁
解决死锁问题:其他代码都不动,只修改TestScope1()的代码即可
#include <thread>
#include <iostream>
#include <string>
#include <mutex>
#include <shared_mutex>
//Linux -lpthread
using namespace std;
static mutex mux1;
static mutex mux2;
void TestScope1()
{
//模拟死锁 停100ms等另一个线程锁mux2
//这里只是使用sleep函数模拟业务逻辑,真正的业务当中不应该有sleep,因为这个是占用CPU资源的。
this_thread::sleep_for(100ms);
cout << this_thread::get_id() << " begin mux1 lock" << endl;
//mux1.lock();
cout << this_thread::get_id() << " begin mux2 lock" << endl;
//mux2.lock(); //死锁
//c++11
//这种,它必须两个同时锁住,才会进行下一步操作,如果没有会释放锁,这样就不会占用到两个锁
//lock(mux1, mux2);
//c++17
scoped_lock lock(mux1, mux2); // 解决死锁,可以传多个锁。
cout << "TestScope1" << endl;
this_thread::sleep_for(1000ms);
//mux1.unlock();
//mux2.unlock();
}
//上锁的顺序:2->1->2->1
void TestScope2()
{
cout << this_thread::get_id() << " begin mux2 lock" << endl;
mux2.lock();
this_thread::sleep_for(500ms);
cout << this_thread::get_id() << " begin mux1 lock" << endl;
mux1.lock();//死锁
cout << "TestScope2" << endl;
this_thread::sleep_for(1500ms);
mux1.unlock();
mux2.unlock();
}
int main(int argc, char* argv[])
{
{
//演示死锁情况
{
thread th(TestScope1);
th.detach();
}
{
thread th(TestScope2);
th.detach();
}
}
getchar();
return 0;
}
3.7 条件变量应用场景生产者消费者信号处理
生产者-消费者模型
生产者和消费者共享资源变量(list队列),生产者生产一个产品,通知消费者消费,消费者阻塞等待信号-获取信号后消费产品(取出list队列中数据)
核心的关键点,生产者和消费者是在不同的线程的,并且可能有多个生产者和多个消费者,他们之间怎么去通信,甚至去均衡多个线程去分配他们去怎么做?
之前的生产者和消费者是怎么做的?生产者发出数据之后,消费者按照固定的时延进行接收,两者之间是相互独立的,而现在是想,一旦当生产者生产出数据之后,就立马通知消费者去处理。而这个通知就是信号量,有了这个之后,可以使得消费者阻塞在那儿,而阻塞是不占用CPU资源的,通过信号量通知消费者解除阻塞。
生产者模型步骤:
准备好信号量:std::condition_variable cv;
1 获得 std::mutex (常通过 std::unique_lock ): unique_lock lock(mux);
2 在获取锁时进行修改: msgs_.push_back(data);
3 释放锁并通知读取线程:lock.unlock();
cv.notify_one(); //通知一个等待信号线程
cv.notify_all(); //通知所有等待信号线程
消费者模型步骤:
1 获得与改变共享变量线程共同的mutex:unique_lock lock(mux);
2 wait() 等待信号通知:
2.1 无lambada 表达式
2.2 lambada 表达式 cv.wait(lock, [] {return !msgs_.empty();});
3.8 condition variable代码示例读写线程
模仿一个写入线程多个读取线程
当改成cv.notify_all();的时候,运行结果是这样的:
也就是三个线程都有响应,但是还是只有一个进入读取。
#include <thread>
#include <iostream>
#include <mutex>
#include <list>
#include <string>
#include <sstream>
#include <condition_variable> // std::condition_variable
using namespace std;
list<string> msgs_;
mutex mux;
condition_variable cv;
void ThreadWrite()
{
for (int i = 0;; i++)
{
stringstream ss;
ss << "Write msg " << i; //字符串拼接,把"Write msg "和i拼在一起,并存放在ss中
unique_lock<mutex> lock(mux); //在这里加锁,确保锁住
msgs_.push_back(ss.str()); //ss.str()<=>ss.data()都返回当前字符串的内容
//这里为什么要先进行解锁呢?假如这里没有解锁,那么notify_one在调用的时候,在下面
//读取线程中的wait函数,wait他需要先锁定,而这里的又没有释放,那么程序就会阻塞,就会造成死锁
lock.unlock(); //释放锁
cv.notify_one(); //先解完锁后,发送信号,通知一个读线程进入
//cv.notify_all(); //即使发送通知信号给所有线程,也只有一个线程能进入
this_thread::sleep_for(3s); //每3s写一个数据,然后通知读取线程去处理
}
}
void ThreadRead(int i)
{
for (;;)
{
cout << "开始读取数据" << endl;
unique_lock<mutex> lock(mux);//加锁
//这个wait什么时候解除阻塞,需要等到cv.notify_one();按次序通知到他,
//如果是notify_all,所有在wait的线程都会返回"开始读取数据",最终还是只能有一个线程能进入读取
//如果是notify_one就只有一个会返回"开始读取数据",最终还是只能有一个线程能进入读取
cv.wait(lock, [i]
{
cout << "线程 " << i << " 等待" << endl;
//msgs_.empty()当是空的时候返回TRUE,TRUE的话相当于阻塞是不应该进行的,看wait源码可知
//return true;//当一直返回TRUE的话,不管有无信号wait都不会阻塞,而继续往下执行
//当一直返回FALSE的话,没有信号的话连这个wait都不进入
//有信号会一直阻塞在这儿,并按照信号间隔进入此wait函数
//return false;
return !msgs_.empty();
});
//获取信号后锁定
while (!msgs_.empty())
{
cout << "线程 " << i << " 开始读取数据" << msgs_.front() << endl;
msgs_.pop_front();
}
}
}
void threadread(int i)//比较简单的演示
{
for (;;)
{
cout << "read msg" << endl;
unique_lock<mutex> lock(mux);//加锁
cv.wait(lock);//是先解锁、阻塞等待信号,获取信号会锁定,所以做消息的处理是线程安全的
//获取信号后锁定
while (!msgs_.empty())
{
cout << "thread " << i << " read mumber " << msgs_.front() << endl;
msgs_.pop_front();
}
}
}
int main(int argc, char* argv[])
{
thread th(ThreadWrite);
th.detach();
for (int i = 0; i < 3; i++)
{
thread th(ThreadRead, i + 1);//带lambda表达式的wait
//thread th(threadread, i + 1);//单纯的wait
th.detach();
}
getchar();//放在这里面,阻塞住当前的操作
return 0;
}