文章目录
- 1.condition_variable
- 2.notify_one()和notify_all()
- 3.wait()
- 3.1 没有第二个参数:while + wait()
- 3.2 有第二个参数:wait() + lambda
需要注意的是,条件变量condition_variable要和互斥锁mutex搭配起来使用。
1.condition_variable
condition_variable 是一个类,其部分源码如下图所示:
2.notify_one()和notify_all()
notify_one():通知一个处于等待状态的线程。
notify_all():通知所有处于等待状态的线程。
3.wait()
3.1 没有第二个参数:while + wait()
首先使用 unique_lock 给互斥量加锁。
如果 while 的循环条件是 false,则不进入 while 循环内,那么流程继续往下执行(此时互斥量是锁着的)。
如果 while 的循环条件是 true,则进入 while 循环内,那么 wait() 将解锁互斥量,并进入等待状态,一直等到其它某个线程调用 notify_one()/notify_all() 通知为止。
当收到其它某个线程的通知后,便从等待状态进入阻塞状态,此时 wait() 不断尝试重新获取互斥锁,如果获取不到,那么就阻塞在这里等待获取;如果获取到了,那么 wait() 返回,接着判断 while 的循环条件:
-
如果 while 的循环条件是 false,则不进入 while 循环内,那么流程继续往下执行(此时互斥量是锁着的)。
-
如果 while 的循环条件是 true,则进入 while 循环内,那么 wait() 将解锁互斥量,并进入等待状态,一直等到其它某个线程调用 notify_one()/notify_all() 通知为止。
场景:生产者生产一个物品,通知消费者消费一个物品;消费完了,消费者再通知生产者继续生产一个物品。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
using namespace std;
std::mutex mtx; // 定义互斥锁
std::condition_variable cv; // 定义条件变量
class Queue
{
public:
void put(int val) // 生产一个物品
{
unique_lock<std::mutex> unilck(mtx);
while (!que.empty())
{
cv.wait(unilck);
}
que.push(val);
cv.notify_all();
cout << "put - " << val << endl;
}
int get() // 消费一个物品
{
unique_lock<std::mutex> lck(mtx);
while (que.empty())
{
cv.wait(lck);
}
int val = que.front();
que.pop();
cv.notify_all();
cout << "get - " << val << endl;
return val;
}
private:
queue<int> que; // C++中STL所有的容器都不是线程安全的
};
void producer(Queue* que) // 生产者线程入口函数
{
for (int i = 1; i <= 100; ++i)
{
que->put(i);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void consumer(Queue* que) // 消费者线程入口函数
{
for (int i = 1; i <= 100; ++i)
{
que->get();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
int main()
{
Queue que; // 生产者线程和消费者线程共享的队列
std::thread t1(producer, &que); // 创建了线程,线程入口函数是producer(),生产者线程开始执行
std::thread t2(consumer, &que); // 创建了线程,线程入口函数是consumer(),消费者线程开始执行
t1.join();
t2.join();
return 0;
}
3.2 有第二个参数:wait() + lambda
首先使用 unique_lock 给互斥量加锁。
如果 lambda 表达式的返回值是 true,那么 wait() 直接返回,流程继续往下执行(此时互斥量是锁着的)。
如果 lambda 表达式的返回值是 false,那么 wait() 将解锁互斥量,并进入等待状态,一直等到其它某个线程调用 notify_one()/notify_all() 通知为止。
当收到其它某个线程的通知后,便从等待状态进入阻塞状态,此时 wait() 不断尝试重新获取互斥锁,如果获取不到,那么就阻塞在这里等待获取;如果获取到了,接着判断这个 lambda 表达式:
-
如果 lambda 表达式的返回值是 true,那么 wait() 直接返回,流程继续往下执行(此时互斥量是锁着的)。
-
如果 lambda 表达式的返回值是 false,那么 wait() 将解锁互斥量,并进入等待状态,一直等到其它某个线程调用 notify_one()/notify_all() 通知为止。
场景:生产者生产一个物品,通知消费者消费一个物品;消费完了,消费者再通知生产者继续生产一个物品。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
using namespace std;
std::mutex mtx; // 定义互斥锁
std::condition_variable cv; // 定义条件变量
class Queue
{
public:
void put(int val) // 生产一个物品
{
unique_lock<std::mutex> unilck(mtx);
cv.wait(unilck, [this] { return que.empty(); });
que.push(val);
cv.notify_all();
cout << "put - " << val << endl;
}
int get() // 消费一个物品
{
unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [this] { return !que.empty(); });
int val = que.front();
que.pop();
cv.notify_all();
cout << "get - " << val << endl;
return val;
}
private:
queue<int> que; // C++中STL所有的容器都不是线程安全的
};
void producer(Queue* que) // 生产者线程入口函数
{
for (int i = 1; i <= 100; ++i)
{
que->put(i);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void consumer(Queue* que) // 消费者线程入口函数
{
for (int i = 1; i <= 100; ++i)
{
que->get();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
int main()
{
Queue que; // 生产者线程和消费者线程共享的队列
std::thread t1(producer, &que); // 创建了线程,线程入口函数是producer(),生产者线程开始执行
std::thread t2(consumer, &que); // 创建了线程,线程入口函数是consumer(),消费者线程开始执行
t1.join();
t2.join();
return 0;
}