目录
- 1. 条件变量
- 2. 生产者消费者模型
- 2.1 概念
- 3. 基于BlockingQueue的生产者消费者模型
- 3.1 概念
- 3.2 等待函数
- 3.3 等待函数的功能
- 3.4 唤醒函数
- 4. 模型复盘
- 5. 总代码
1. 条件变量
当一个线程互斥地访问某个变量或者临界资源时,它可能发现在其它线程改变状态之前,它什么也做不了。
例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。
所以一般只有锁的情况下,我们很难知道临界资源的状态。
那么此时就引入了一种策略叫做条件变量,可以让用户知道临界资源的状态。
- 条件变量相关函数:
cond:要初始化的条件变量;attr:NULL
- 创建了条件变量,还需要在某个条件变量下等待:
mutex:互斥量
- 为什么pthread_ cond_ wait 需要互斥量?
条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。
条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。
更通俗地说,就是需要其他线程来改变等待的条件,使得正在等待的线程不会阻塞在条件变量下。 但是这个改变的过程就是访问临界区,所以需要加锁来保证线程安全;所以需要互斥量的保护。
- 创建和等待有了,还需要有能通知用户的方法:
这个signal函数的作用是唤醒在该条件变量下等待的第一个线程,由于会有多个线程等待,所以会有一个等待队列,这个队列在条件变量中,每次唤醒的是该队列里的第一个线程。
broadcast代表唤醒所有线程。
写一个程序,让一个线程控制另一个线程:
#include <iostream>
#include <cstdio>
#include <string>
#include <unistd.h>
#include <pthread.h>
using namespace std;
pthread_mutex_t mtx;//互斥锁
pthread_cond_t cond;//条件变量
//让ctrl控制work线程,让它定期运行
void* ctrl(void* args)
{
string name = (char*)args;
while(true){
cout<< "master says begin work...." <<endl;
pthread_cond_signal(&cond);
sleep(1);
}
}
void* work(void* args)
{
int number = *(int*)args;
delete (int*)args;
while(true){
//worker线程进来首先要等待命令
pthread_cond_wait(&cond,&mtx);
cout<< "worker:" << number << " is working...."<<endl;
}
}
#define NUM 3
int main()
{
pthread_mutex_init(&mtx,nullptr);
pthread_cond_init(&cond,nullptr);
pthread_t master;
pthread_t worker[NUM];
pthread_create(&master,nullptr,ctrl,(void*)"master");
for(int i = 0;i < NUM;i++){
int* number = new int(i);
pthread_create(worker+i,nullptr,work,(void*)number);
}
//线程等待
for(int i = 0;i < NUM;i++){
pthread_join(worker[i],nullptr);
}
pthread_join(master,nullptr);
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&cond);
return 0;
}
运行结果:
这也就验证了上面说的等待队列的问题;因为三个线程每次运行完以后,都会再次以这个顺序进行等待,这不是巧合,这是等待队列产生的效果。
更深层次地,可以知道条件变量应该就是一个结构体,里面不仅包含了某些数据,还包含了一个等待队列。
将signal改为broadcast以后,将一次唤醒所有进程:
pthread_cond_broadcast(&cond);
运行结果:
2. 生产者消费者模型
2.1 概念
在这个模型中,生产者消费者其实都是一个个的线程,由于生产者与消费者之间有非常强的耦合关系,所以需要有一个方法来使其进行解耦。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
在日常生活中,生产者可以理解为供货商,消费者可以理解为顾客,缓冲区理解为超市。
那么这个超市就是一个临界资源,需要被保护起来。
口头来说,这个模型就是“321”:3代表三种关系,分别是生产者vs生产者、生产者vs消费者、消费者vs消费者;2代表两种角色,也就是两种执行流,分别是生产者和消费者;1代表一种交易场所,也就是一段缓冲区。
3. 基于BlockingQueue的生产者消费者模型
3.1 概念
BlockingQueue 在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
- 当生产满了的时候,就应该不要生产了(不要竞争锁了),而应该让消费者来消费;
- 当消费空了,就不应该消费(不要竞争锁了),让生产者来进行生产。
首先定义阻塞队列:
class BlockQueue
{
const int default_cap = 5;//自定义队列元素缺省值
private:
std::queue<T> bq_;//阻塞队列
int capacity_; //队列的元素上限
pthread_mutex_t mtx_;//保护临界资源的锁
//bq_空了,就让消费者在该条件变量下等待
//bq_满了,就让生产者在该条件下等待
pthread_cond_t full_;//条件为满的条件变量
pthread_cond_t empty_;//条件为空的条件变量
}
bq_是阻塞队列,用来进行模拟生产者消费者数据的消费与生产。
基本的思路就是:在有数据被生产的时候让消费者进行消费,在有数据被消费的时候让生产者进行生产。
- 那么消费者和生产者各自怎么能知道自己什么时候该干活?
只有生产者才知道消费者什么时候该消费,也只有消费者知道生产者什么时候该生产。 可以这么理解:生产者提供了产品,它就知道消费者需要消费了,而消费者消耗了产品,它就知道生产者需要生产了,这并不是交易场所的任务,而是双方的任务。
也就是说,在有数据被生产的时候,由生产者来通知消费者过来消费;在有数据被消费了的时候,由消费者通知生产者过来生产:
//输入型参数用const T&
void Push(const T& in)
{
//操作临界区先上锁
LockQueue();
//向队列中放数据,生产函数
if(isFull()){
//如果队列满,就让生产者挂起,停止生产
ProducterWait();
}
bq_.push(in);
//走到这里肯定有数据被生产
//虽队列不满,但是已有数据可消费,唤醒消费者
WakeupConsumer();
UnLockQueue();
}
//输出型参数用T*
void Pop(T* out)
{
//向队列中拿数据,消费函数
LockQueue();
if(isEmpty()){
//队列是空的,无法消费
ConsumerWait();
}
*out = bq_.front();
bq_.pop();
//走到这里肯定消费了数据
//那么此时适合生产,唤醒生产者
WakeupProducter();
UnLockQueue();
}
这是基本的思路,其中等待函数和唤醒函数的封装如下:
3.2 等待函数
等待函数的意义是在不适合进行对应操作的时候(比如队列已满,生产者不必继续生产),让该对象进行挂起等待,直到有条件继续进行操作
对于生产者来说,它需要等待的情况只有队列已满,所以说它要恢复生产,就要等队列不为满,要去full_的条件变量下等待
对于消费者来说,它需要等待的情况只有队列为空,所以说它要恢复消费,就要等队列不为空,要去empty_的条件变量下等待
void ProducterWait()
{
pthread_cond_wait(&empty_,mtx_);
}
void ConsumerWait()
{
pthread_cond_wait(&full_,mtx);
}
3.3 等待函数的功能
这里有个问题值得注意:线程是带着锁进入等待的,那如果它一直等不到条件满足,那别的线程岂不是无法拿到锁资源了?
所以可以让等待函数在进入前自己把锁释放
但是如果条件满足,等待函数执行完毕,执行到临界区,该线程此时未上锁,岂不是会造成线程安全问题?
所以cond_wait函数同时拥有两个功能:
- 调用生产者等待的时候,会首先自动释放mtx,然后再挂起自己而不是一直带着锁挂着,占有资源
- 返回的时候,会首先自动竞争锁,获取到锁之后才能返回
有了这两个功能,上述问题就可以解决了
首先就是不过度占有资源。 如果没有第一点,某线程在进行等待的时候会带着锁进入等待队列,那么别的进程也无法进行锁竞争。
其次就是保证线程安全。 如果没有第二点,某线程等待完毕以后将会出现在临界区,由于没有上锁,会造成数据混乱;但是如果要求该线程等待完成以后马上进行锁竞争,就不会有这种情况了。
这里有一个细节,我们进行条件检测的时候需要进行循环检测,以保证退出循环一定是由于条件不满足,因为不排除会有这些情况:
- 挂起失败
这个可以理解为cond_wait函数调用失败
- 伪唤醒
可以理解为条件没有满足,但是程序却按照满足的情况往下走了
如果仅仅使用一次 if 判断,是不太完善的,这样的情况发生以后会继续向下走,会造成后续的生产函数错误。
总之要进行消费与生产,只要是按照对应条件下来进行的,就不会有原则性的错误。
所以上述条件判断应该改为:
while(isEmpty())
while(isFull())
3.4 唤醒函数
等待的函数不是自己能知道条件已满足,继续进行操作的,而是需要有一个唤醒的过程,通过生产者唤醒消费者,消费者唤醒生产者:
void WakeupConsumer()
{
//消费者正在empty_条件变量下等待
pthread_cond_signal(&empty_);
}
void WakeupProducter()
{
//生产者正在full_条件变量下等待
pthread_cond_signal(&full_);
}
4. 模型复盘
5. 总代码
测试函数:
#include "BlockQueue.hpp"
using namespace zcb;
using namespace std;
void* consumer(void* args)
{
BlockQueue<int>* bq = (BlockQueue<int>*)args;
while(true){
sleep(2);
int data = 0;
bq->Pop(&data);
cout<< "消费者消耗数据...->" << data << endl;
}
}
void* producter(void* args)
{
BlockQueue<int>* bq = (BlockQueue<int>*)args;
while(true){
int data = rand()%20 + 1;
cout<< "生产者生产数据...->" << data << endl;
bq->Push(data);
}
}
int main()
{
//随机数种子
srand((long long)time(nullptr));
//交易场所
BlockQueue<int>* bq = new BlockQueue<int>();
pthread_t c,p;
//bq传进去,拿到同一份交易场所
pthread_create(&c,nullptr,consumer,(void*)bq);
pthread_create(&p,nullptr,producter,(void*)bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
头文件实现:
#include <iostream>
#include <cstdio>
#include <string>
#include <unistd.h>
#include <pthread.h>
#include <queue>
#include <time.h>
#include <cstdlib>
#pragma once
namespace zcb
{
template<class T>
class BlockQueue
{
const int default_cap = 5;
private:
std::queue<T> bq_;//阻塞队列
int capacity_; //队列的元素上限
pthread_mutex_t mtx_;//保护临界资源的锁
//bq_空了,就让消费者在该条件变量下等待
//bq_满了,就让生产者在该条件下等待
pthread_cond_t full_;
pthread_cond_t empty_;
private:
bool isFull()
{
return bq_.size() == capacity_;
}
bool isEmpty()
{
return bq_.size() == 0;
}
void LockQueue()
{
pthread_mutex_lock(&mtx_);
}
void UnLockQueue()
{
pthread_mutex_unlock(&mtx_);
}
void WakeupConsumer()
{
//消费者正在empty_条件变量下等待
pthread_cond_signal(&empty_);
}
void WakeupProducter()
{
//生产者正在full_条件变量下等待
pthread_cond_signal(&full_);
}
public:
BlockQueue(int cap = default_cap)
:capacity_(cap)
{
pthread_mutex_init(&mtx_,nullptr);
pthread_cond_init(&full_,nullptr);
pthread_cond_init(&empty_,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&mtx_);
pthread_cond_destroy(&empty_);
pthread_cond_destroy(&full_);
}
void ProducterWait()
{
//在是否为空的条件变量下等待才有意义
//1. 调用生产者等待的时候,会首先自动释放mtx,然后再挂起自己
//而不是一直带着锁挂着,占有资源
//2. 返回的时候,会首先自动竞争锁,获取到锁之后才能返回
pthread_cond_wait(&empty_,mtx_);
}
void ConsumerWait()
{
//在是否满的条件变量下等待才有意义
pthread_cond_wait(&full_,mtx);
}
public:
//输入型参数用const T&
//队列使用前要加锁
void Push(const T& in)
{
LockQueue();
//向队列中放数据,生产函数
while(isFull()){
ProducterWait();
}
bq_.push(in);
//走到这里肯定有数据被生产
//队列不满,但是已有数据可消费,唤醒消费者
WakeupConsumer();
UnLockQueue();
}
//输出型参数用T*
void Pop(T* out)
{
//向队列中拿数据,消费函数
LockQueue();
while(isEmpty()){
//无法消费
ConsumerWait();
}
*out = bq_.front();
bq_.pop();
//走到这里肯定消费了数据
//那么此时适合生产,唤醒生产者
WakeupProducter();
UnLockQueue();
}
};
}