关于c++多线程中的互斥锁mutex
- c++中的多线程
- 线程的基本概念
- C++ 标准库中的线程支持
- 多线程与主线程与join
- 换一种方式理解
- 线程互斥锁
- 第一种
- 第二种
- 子线程与互斥锁
- 混合锁--看这个应该就明白了(这个主要使用的是嵌套锁)
- 定义一个类
- 创建线程
- 这个示例主要使用并列锁
- 重点
- 并列锁示例
- 类----锁对象和对应锁空间
- push函数
- pop函数
- 总结
c++中的多线程
C++ 的多线程编程是指在一个程序中同时执行多个线程,以实现并行计算和提高程序的效率。线程是程序执行中的最小单位,每个线程都有自己的执行路径和栈空间,可以并行处理任务。C++ 提供了标准库支持,以简化多线程编程的过程。
线程的基本概念
线程:是轻量级的进程,它共享进程的资源(如内存和文件描述符),但有自己的执行路径和栈。
并发:在同一时间段内有多个线程在执行,可以在多核处理器上真正并行执行,也可以在单核处理器上通过时间片轮转实现并发执行。
同步:多个线程访问共享资源时需要保证数据一致性,这通常通过锁、条件变量等同步机制来实现。
C++ 标准库中的线程支持
C++11 引入了对多线程的标准支持,提供了线程库和同步机制。以下是一些关键组件:
std::thread:用于创建和管理线程。
std::mutex:用于线程间的互斥操作,防止多个线程同时访问共享资源。
std::lock_guard 和 std::unique_lock:用于自动管理互斥锁的加锁和解锁。
std::condition_variable:用于线程间的条件同步,允许线程在某些条件下进行等待和通知。
std::future 和 std::promise:用于线程间的异步任务和结果传递。
多线程与主线程与join
int main()
{
SetConsoleOutputCP(936);
string str = "123";
for (int i = 0; i <3;)
{
/* code */
que.push("e" + string(1,str[i]),ready);
++i;
}
cout << "nihaohiahc" << endl;
// 多线程开发
thread t1(add_element, "e4");
thread t2(await);
//开启线程
t1.join();
cout << "nihao " << endl;
t2.join();
cout << que.size()<<endl;
cout << que.queue_array[0];
cout << que.pop() << endl
<< que.front()<<endl;
system("pause");
system("cls");
}
这个主函数代表的就是主线程
// 多线程开发
thread t1(add_element, "e4");
thread t2(await);
这两个代表的是子线程
一旦创建就开始自动运行
//等待线程
t1.join();
cout << "nihao " << endl;
t2.join();
join代表,主线程开始运行到t1.join的时候,必须得等待t1执行完毕,才能运行cout,但t2是线程。不会被影响,在线程创建的那一刻就开始被执行,同时t2.join代表主线程运行到这行代码也会被要求等待,
总结就是,线程一旦被创建立马就会被运行,主线程遇到join需要听下来等待相应的子线程运行完毕。
所以这里的nihao一定是在t1结束之后才会被输出
换一种方式理解
thread t1(add_element, "e4");
t1.join();
thread t2(await);
在创建t1线程之后,子线程就开始自己跑自己的,主线程发现遇到t1.join那么就会停下来等待它执行完毕,之后主线程才开始创建t2子线程
线程互斥锁
通过包创建的 mutex 对象就是代表一种资源锁,也叫代码锁,也就是从当前写这个锁开始到这个锁生命结束中间的所有代码(资源)都属于这个对象
当然创建锁的方式有两种
第一种
unique_lock<mutex> temp_lock(que.mut);
我这里把que是队列对象,mut是mutex的对象,同时他也是que类的成员变量。
解释: unique_lock代表能够自动关开锁,mut是mutex锁对象,temp_lock是锁的对象,也就是具体资源的对象化。
还是用代码说明吧
void await()
{
//可以手动开解锁
unique_lock<mutex> temp_lock(que.mut);
cout << "开始唤醒11111" << endl;
que.tail_wait.wait(temp_lock);
cout<< "开始唤醒" << endl;
que.tail_wait.notify_one();
}
在这个函数作用域内,我们生成了一个资源锁对象temp_lock,锁的资源就是,直到这个对象死去或者进入等待状态(que.tail_wait.wait(temp_lock);),就会解开锁
锁的名称是mut,资源管理者就是temp_lock,对此等待操作的是资源管理者
第二种
void add_element(string x)
{
Sleep(5000);
que.mut.lock();
// unique_lock<mutex> temp_lock(que.mut);
cout << "开始添加元素之前" << endl;
que.push(x,ready);
que.mut.unlock();
}
直接用锁mut锁住这些资源,然后直到解锁unlock为止
子线程与互斥锁
子线程代表是我们创建的很多个线程函数
互斥锁代表锁进程的对象
因此当线程开始运行的时候,他们都需要去争抢一把线程锁,当一个线程获取到对应的互斥锁,那么该锁下面的其他子线程需要等待,获取到该锁线程释放锁
释放锁的方式有两种:第一进行等待行列。第二生命结束或者被人unlock开锁
混合锁–看这个应该就明白了(这个主要使用的是嵌套锁)
定义一个类
class queue_arary
{
public:
queue_arary(int capacity);
~queue_arary();
void print();
int size()
{
return this->queue_size;
}
bool is_full();
string front();
string pop();
void push(string data,bool& ready);
bool empty();
int queue_capacity;
int queue_size;
int head_index;
string *queue_array;
condition_variable_any tail_wait;// 生产者也就是push的人
condition_variable_any head_wait;// 消费者也就是pop的人
mutex queue_lock;//允许常量成员函数修改变量
mutex mut;
};
其中包含两种锁 mutex queue_lock;//允许常量成员函数修改变量
mutex mut;
包含一个简单的push函数后面用来讲解混合锁
inline void queue_arary::push(string data,bool& ready)
{
// 自动加锁和解锁
unique_lock<mutex> Lock(queue_lock);
// 手动开启关闭
// 保证多线程也不会乱出错
try
{
// 可以让线程进入等待
// 直到队列有空闲位置,避免虚假唤醒
while (this->is_full())
{
// /* code */
// if (this->tail_wait.wait_for(Lock, std::chrono::seconds(1)) == std::cv_status::timeout)
// {
// /* code */
// cout << "Timeout, performing action" << endl;
// return;
ready = true;
// {unique_lock<mutex> notify_lock(this->mut);
this->mut.unlock();
this->tail_wait.notify_all();
tail_wait.wait(Lock);
cout << "Queue is full, wait for pop" << endl;
return;
}
this->queue_array[this->queue_size] = data;
this->queue_size++;
cout<<"push success"<<endl;
// 通知其他线程
head_wait.notify_one();
}
catch (...)
{
// 确保异常情况下也能解锁
throw;
}
}
创建线程
// 多线程开发
thread t1(add_element, "e4");
thread t2(await);
void add_element(string x)
{
Sleep(5000);
que.mut.lock();
// unique_lock<mutex> temp_lock(que.mut);
cout << "开始添加元素之前" << endl;
que.push(x,ready);
}
void await()
{
//可以手动开解锁
unique_lock<mutex> temp_lock(que.mut);
cout << "开始唤醒11111" << endl;
que.tail_wait.wait(temp_lock);
cout<< "开始唤醒" << endl;
que.tail_wait.notify_one();
}
首先采用sleep使得t1的线程缓慢(当然这是我人为的不推荐),争抢不过t2,因此t2获得mut互斥锁
接着t2带着互斥锁mut进行到 que.tail_wait.wait(temp_lock);进入tail_wait的等待空间中,此时因为等待从而失去互斥锁mut
之后t1拿到mut互斥锁的权限que.mut.lock();,进行上锁,开始执行到push函数,但因为add_element函数作用域没有消失,因此t1现在最外面还有一把mut的大锁
接着进入push函数,遇到另一把queue_block的锁,对此mut下面有一把queue_block的锁,直到遇见 this->mut.unlock();,开始释放掉他的mut锁
最重要的一点,释放掉mut锁之后,开始唤醒这个等待空间的线程,线程就会去获取锁,这个时候发现锁只有mut,那么就进行t2线程的唤醒,
当然这个唤醒进程得等queue_block解锁之后才行,也就是进入到等待空间
这个时候t2,获取到锁,开始执行唤醒等待空间的线程,当然这也是要等解锁完成后才可以,
最后t1又获取到queue_block的锁,继续执行直到解锁也就是跳出作用域
流程如下
对此结束
这个示例主要使用并列锁
重点
一个等待空间的唤醒需要在对应锁变量作用域下在能够唤醒对应的等待空间
比如
inline void queue_arary::push(string data)
{
// 自动加锁和解锁
unique_lock<mutex> Lock(this->push_block);
try
{
// 可以让线程进入等待
// 直到队列有空闲位置,避免虚假唤醒
while (this->is_full())
{
// 超时行为
// if (this->tail_wait.wait_for(Lock, std::chrono::seconds(4)) == std::cv_status::timeout)
// {
// /* code */
// cout << "Timeout, performing action" << endl;
// return;
// }
tail_wait.wait(Lock);
}
this->queue_array[this->tail_index] = data;
this->tail_index = (this->tail_index + 1) % this->queue_capacity;
this->qsize++;
cout << "push success" << endl;
// 通知其他线程
//只使用空的时候进行加锁解锁和通知
if (this->qsize == 1)
{
/* code */
this->push_block.unlock();
this->pop_block.lock();
this->head_wait.notify_one();
this->pop_block.unlock();
}
if (this->qsize<this->queue_capacity)
{
/* code */
this->tail_wait.notify_one();
}
}
catch(...)
{
// 确保异常情况下也能解锁
throw;
}
}
其中unique_lock Lock(this->push_block);是push_block的锁
要想唤醒 另一个等待空间,需要创建对应的锁作用域
this->push_block.unlock();
this->pop_block.lock();
this->head_wait.notify_one();
this->pop_block.unlock();
并列锁示例
结合上面的重点,观察锁作用域和等待唤醒空间
类----锁对象和对应锁空间
class queue_arary
{
public:
queue_arary(int capacity);
~queue_arary();
void print();
int size()
{
return this->qsize;
}
bool is_full();
string front();
string pop();
void push(string data);
bool empty();
int queue_capacity;
int head_index;
int tail_index;
string *queue_array;
//原子变量
atomic<int> qsize;
//等待空间
condition_variable_any tail_wait; // 生产者也就是push的人
condition_variable_any head_wait; // 消费者也就是pop的人
//队列锁
mutex push_block; // 允许常量成员函数修改变量
mutex pop_block; // 允许常量成员函数修改变量
};
其中的锁对象与对应的唤醒空间如下
//等待空间
condition_variable_any tail_wait; // 生产者也就是push的人
condition_variable_any head_wait; // 消费者也就是pop的人
//队列锁
mutex push_block; // 允许常量成员函数修改变量
mutex pop_block; // 允许常量成员函数修改变量
关于并列锁,我们在使用的时候就是防止死锁,对此在使用的时候,要提前解锁,方便唤醒其他的等待空间
push函数
inline void queue_arary::push(string data)
{
// 自动加锁和解锁
unique_lock<mutex> Lock(this->push_block);
try
{
// 可以让线程进入等待
// 直到队列有空闲位置,避免虚假唤醒
while (this->is_full())
{
tail_wait.wait(Lock);
}
this->queue_array[this->tail_index] = data;
this->tail_index = (this->tail_index + 1) % this->queue_capacity;
this->qsize++;
cout << "push success" << endl;
// 通知其他线程
//只使用空的时候进行加锁解锁和通知
if (this->qsize == 1)
{
/* code */
this->push_block.unlock();
this->pop_block.lock();
this->head_wait.notify_one();
this->pop_block.unlock();
}
if (this->qsize<this->queue_capacity)
{
/* code */
this->tail_wait.notify_one();
}
}
catch(...)
{
// 确保异常情况下也能解锁
throw;
}
}
其中锁对象unique_lock Lock(this->push_block);等待空间为 tail_wait.wait(Lock);
我们需要唤醒的是pop函数中的等待空间,
表示只要我们添加了一个push元素在队列中,那么就可以先解锁当前push_block在唤醒pop_block的等待空间(需要包含在锁作用域下面)
pop函数
inline string queue_arary::pop()
{
unique_lock<mutex> Lock(this->pop_block);
// 手动开启关闭
while (this->qsize == 0)
{
head_wait.wait(Lock);
}
// 利用copy的方法进行元素的移动
// std:: this_thread::sleep_for(chrono::seconds(10));
string temp = this->queue_array[this->head_index];
this->queue_array[this->head_index] = "";
this->head_index = (this->head_index + 1) % this->queue_capacity;
this->qsize--;
// 通知其他线程
if (this->qsize!=0)
{
/* code */
head_wait.notify_one();
}
//一次pop唤醒所有push,从满到不满
if(this->qsize+1==this->queue_capacity)
{
pop_block.unlock();
push_block.lock();
tail_wait.notify_one();
push_block.unlock();
}
return temp;
当pop_block的等待空间被唤醒的时候,一次的pop可以唤醒下面的自己 head_wait.notify_one();的等待空间所有pop线程,
if (this->qsize!=0)
{
/* code */
head_wait.notify_one();
}
而对应的当一次pop可以解锁,在锁作用域下面唤醒对应的等待空间,也就是push_block空间
同理在push函数中,一次被唤醒,可以唤醒自己所有等待空间的push_block线程
if (this->qsize<this->queue_capacity)
{
/* code */
this->tail_wait.notify_one();
}
总结
嵌套锁一定要在把所有的锁从里到外一个一个解锁才行(最里面的解锁一般是变成等待空间之后会释放锁,这个时候瞬间执行,区间代码,也就是mut.unlock()这样解开最外面的锁)
并列锁一定要在调用另一个等待唤醒空间的时候,进行解锁,在创建锁作用域,方便唤醒对应的锁等待空间
至此希望能够对大家有所帮助,如有不懂的,评论区见