【并发编程十】c++线程同步——条件变量(condition_variable)
- 一、互斥
- 二、条件变量
- 1、为何要引入条件变量?
- 2、不使用条件变量
- 3、使用条件变量
- 3.1、互斥锁有什么问题?
- 3.2、条件变量
- 3.3、条件变量成员函数
- 3.4、demo
- 三、future
- 四、信号量
简介:
上一篇文章,我们讲解了【并发编程九】c++线程同步——互斥(mutex)。本篇文章,我们详细的介绍下c++标准库提供的线程同步方法——条件变量(condition_variable)。
参考:
1、https://www.apiref.com/cpp-zh/cpp/thread.html
2、https://en.cppreference.com/w/cpp/thread
3、书籍《c++服务器开发精髓》——张远龙
一、互斥
参见【并发编程九】c++线程同步——互斥(mutex)
二、条件变量
1、为何要引入条件变量?
- 例子
在一条生产线上有一个仓库,当生产者生产时需要锁住仓库独占,而消费者去产品时也需要锁住仓库独占。
如果,生产者发现仓库满了,那么他就不能生产了,编程了阻塞状态。但是此时生产者独占仓库,消费者又无法进入仓库消耗产品,这样就造成了一个僵死的状态。
我们需要一种机制,当互斥量被锁住以后发现当前线程还是无法完成自己的操作,那么它应该释放互斥量,让其他线程哦工作。
- 1、可以采用轮询的方式,不停的查询你需要的条件。
- 2、让系统来帮你查询条件,使用条件变量。
2、不使用条件变量
- demo
#include<iostream>
#include <thread>
#include<mutex>
#include<deque>
#include<chrono>
using namespace std;
mutex mtx;
deque<int> q;
//线程A,producer
void task1()
{
int i = 0;
while(true)
{
unique_lock<mutex> lock(mtx);
if (q.size() < 1000)
{
if (i < 999)
{
q.push_back(i);
i++;
}
else
{
i = 0;
}
}
else
{
// std::this_thread::sleep_for(std::chrono::seconds(1));;
}
}
}
//线程B,consumer
void task2()
{
int da = 0;
while (true)
{
unique_lock<mutex> lock(mtx);
if (!q.empty())
{
da = q.front();
q.pop_front();
cout << "get value from que:" << da << endl;
cout << "que.size:" << q.size()<<endl;
}
}
}
int main()
{
cout << "que.size:" << q.size() << endl;
thread t2(task2);
thread t1(task1);
t1.join();
t2.join();
}
- 输出
3、使用条件变量
3.1、互斥锁有什么问题?
- 尝试获取锁的人会一直等待,浪费cpu资源。(功耗和性能浪费)
3.2、条件变量
- 提供睡眠/唤醒机制,避免无意义的等待。
条件变量是允许多个线程相互交流的同步原语。它允许一定量的线程等待(可以定时)另一线程的提醒,然后再继续。条件变量始终关联到一个互斥。
定义于头文件 <condition_variable>
3.3、条件变量成员函数
- 通知
通知成员函数 | 解释 |
---|---|
notify_one | 通知一个等待的线程(公开成员函数) |
notify_all | 通知所有等待的线程(公开成员函数) |
- 等待
等待成员函数 | 解释 |
---|---|
wait | 阻塞当前线程,直到条件变量被唤醒(公开成员函数) |
wait_for | 阻塞当前线程,直到条件变量被唤醒,或到指定时限时长后(公开成员函数) |
wait_until | 阻塞当前线程,直到条件变量被唤醒,或直到抵达指定时间点(公开成员函数) |
简单说下,如果是新人,简单理解wait和notify_one两个函数就行了,基本就明白了条件变量的原理,如下面的demo,wait就是等待notify的通知后再执行
3.4、demo
#include<iostream>
#include <thread>
#include<mutex>
#include<deque>
#include<chrono>
#include<condition_variable>
using namespace std;
mutex mtx;
deque<int> q;
condition_variable cv;
//线程A,producer
void task1()
{
int i = 0;
while(true)
{
unique_lock<mutex> lock(mtx);
if (q.size() < 1000)
{
if (i < 99)
{
q.push_back(i);
cv.notify_one();//cv.notify_all();
i++;
}
else
{
i = 0;
}
}
else
{
cv.notify_one();
//std::this_thread::sleep_for(std::chrono::seconds(1));;
}
}
}
//线程B,consumer
void task2()
{
int da = 0;
while (true)
{
unique_lock<mutex> lock(mtx);
if (q.empty())//如果有多个消费者,此处应该为while(q.empty())
{
cv.wait(lock);
}
da = q.front();
q.pop_front();
cout << "get value from que:" << da << endl;
cout << "que.size:" << q.size() << endl;
}
}
int main()
{
cout << "que.size:" << q.size() << endl;
thread t2(task2);
thread t1(task1);
t1.join();
t2.join();
}
- 输出
- cpu占用率
三、future
(书写中。。。还未完成)
四、信号量
(书写中。。。还未完成)