文章目录
- 前言
- 一、条件变量
- 同步概念与竞态条件
- 条件变量函数
- 初始化
- 销毁
- 等待条件满足
- 唤醒等待
- 为什么pthread_cond_wait需要互斥量?
- 条件变量使用规范
- 二、生产者消费者模型
- 为何要使用生产者消费者模型
- 生产者消费者模型的优点
- 基于BlockingQueue的生产者消费者模型
- C++ queue模拟阻塞队列的生产消费模型
- 三、POSIX信号量
- 函数
- 初始化信号量
- 销毁信号量
- 等待信号量
- 发布信号量
- 基于环形队列的生产消费模型
- 总结
前言
线程互斥,他是对的,但他不一定合理。因为互斥有可能导致饥饿问题。所谓饥饿问题就是一个执行流,长时间得不到某种资源。这小章我来基于条件变量带大家认识线程同步。
正文开始!
一、条件变量
- 当一个线程互斥地访问某个变量时,他可能发现在其他线程改变状态之前,他什么也做不了。
- 例如一个线程访问队列的时候,发现队列为空,他只能等待,直到其他线程将一个节点添加到队列中。这种情况就需要用到条件变量。
同步概念与竞态条件
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效的避免饥饿问题,叫做同步
- 竞态条件:因为时序为题,而导致程序异常,我们称之为竞态条件。在这种场景下,这种问题也不难理解
条件变量函数
初始化
唤醒线程由系统唤醒变为让程序员自己唤醒。
条件变量必须要和互斥锁mutex一同使用。
cond:要初始化的条件变量
attr:默认为NULL
销毁
int pthread_cond_destroy(pthread_cond_t *cond)
等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量
唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
代码实现
#include <iostream>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
using namespace std;
//定义一个条件变量
pthread_cond_t cond;
//定义一个互斥锁,并且初始化
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
//定义全局退出变量
volatile bool quit=false;
void* waitCommand(void* args)
{
while(!quit)
{
//执行了下面的代码,证明某一种条件不就绪(现在还没有场景),要我这个线程等待
//三个线程,都会在条件变量下进行排队
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond,&mutex);//让对应的线程进行等待,等待被唤醒
cout<<"thread id "<<pthread_self()<<"run..."<<endl;
pthread_mutex_unlock(&mutex);
}
cout<<"thread id "<<pthread_self()<<"end..."<<endl;
return nullptr;
}
int main()
{
pthread_cond_init(&cond,nullptr);
pthread_t t1;
pthread_t t2;
pthread_t t3;
pthread_create(&t1,nullptr,waitCommand,nullptr);
pthread_create(&t2,nullptr,waitCommand,nullptr);
pthread_create(&t3,nullptr,waitCommand,nullptr);
while(true)
{
char n='a';
cout<<"请输入你的command(n/q): ";
cin>>n;
if(n=='n')
pthread_cond_broadcast(&cond);
//pthread_cond_signal(&cond);
else
{
quit=true;
break;
}
sleep(1);
}
pthread_cond_broadcast(&cond);
pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
pthread_join(t3,nullptr);
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);
return 0;
}
为什么pthread_cond_wait需要互斥量?
- 条件等待是一个线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且有友好的通知等待在条件变量上的线程。
- 条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化,所以一定要用互斥锁来保护,没有互斥锁就无法安全地获取和修改共享数据。
条件(对应的共享资源的状态)—>程序员要判断资源是否满足自己操作的要求
条件变量(条件满足或者不满足的时候,进行wait或signal的一种方式)
按照上面的说法,我们设计出如下的代码:先上锁,发现条件不满足,解锁,然后等待在条件变量上不就行了,如下代码:
// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false) {
pthread_mutex_unlock(&mutex);
//解锁之后,等待之前,条件可能已经满足,信号已经发出,但是该信号可能被错过
pthread_cond_wait(&cond);
pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);
- 由于解锁和等待不是原子操作。调用解锁之后,pthread_cond_wait之前,如果已经有其他线程获取到互斥量,摒弃条件满足,发送了信号,那么pthread_cond_wait将错过这个信号,可能会导致线程用眼阻塞在这个pthread_cond_wait。所以解锁和等待必须是一个原子操作。
- int pthread_cond_wait(pthread_cond_t* cond,pthread_mutex_t* mutex);进入该函数后,回去看条件量等不等于零?等于,就把互斥量变成1,直到cond_Wait返回,把条件量改成1,把互斥量恢复成原样。
条件变量使用规范
- 等待条件代码
pthread_mutex_lock(&mutex);
while(条件为假)
pthread_cond_wait(&cond,&mutex);
//修改条件
pthread_mutex_unlick(&mutex);
- 给条件发送信号代码
pthread_mutex_lock(&mutex);
//设置条件为真
pthread_cond_signal(&c ond);
pthread_mutex_unlick(&mutex);
二、生产者消费者模型
生产者和生产者(互斥) 消费者和消费者(互斥) 生产者和消费者(互斥/同步)—>3种关系
生产者和消费者:线程承担的—>2种角色
超市:内存中特定的一种内存结构(数据结构)—>1个交易场所
1.如何让多个消费者线程等待呢?又如何让消费者线程被唤醒呢?
2.如何让多个生产者线程等待呢?又如何让生产者线程被唤醒呢?
3.如何衡量消费者和生产者所关心的条件是否就绪呢?
为何要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型的优点
- 解耦
- 支持并发
- 支持忙闲不均
基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型上的数据结构。其与普通队列区别在于,当队列为空时,从队列中获取元素的操作将会被阻塞,直到队列被放入了元素;当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入乐元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出来(以上的操作都有基于不同的线程来说的,线程在对阻塞队列进程操作时也会被阻塞)
C++ queue模拟阻塞队列的生产消费模型
为了便于理解,我以单生产者,但消费者来进行说明
BlockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include<cstdlib>
#include <unistd.h>
#include<pthread.h>
using namespace std;
//新需求:我只想保存最新的5个任务,如果来了任务,老的任务,我想让他直接被丢弃
const uint32_t gDefaultCap=5;
template <class T>
class BlockQueue
{
public:
BlockQueue(uint32_t cap=gDefaultCap)
:_cap(cap)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_conCond,nullptr);
pthread_cond_init(&_proCond,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_conCond);
pthread_cond_destroy(&_proCond);
}
//生产接口
void push(const T& in)
{
//加锁
//判断-->是否适合生产-->bq是否为满-->程序员视角的条件-->1.满(不生产) 2.不满(生产)
//if(满) 不生产,休眠
//else if(不满) 生产,唤醒消费者
//生产
//解锁
lockQueue();
//if(isFull())//未知原因导致函数调用错误,导致条件不满足
while(isFull())
{
//before:当我等待的时候,会自动释放_mutex
proBlockWait();//阻塞等待,等待被唤醒,被唤醒!=条件满足(概率虽然非常小)
//after:当我醒来的时候,我是在临界区里醒来的!!
}
//条件满足,可以生产
pushCore(in);
unlockQueue();
wakeupCon();//唤醒消费者
}
//消费接口
T pop()
{
//加锁
//判断-->是否适合消费-->bq是否为空-->程序员视角的条件-->1.空(不消费) 2.非空(消费)
//if(空) 不消费,休眠
//else if(非空) 消费,唤醒生产者
//消费
//解锁
lockQueue();
//if(isEmpty())//未知原因导致函数调用错误,导致条件不满足
while(isEmpty())
{
conBlockWait();//阻塞等待,等待被唤醒
}
//条件满足,可以消费
T tmp=popCore();
unlockQueue();
wakeupPro();//唤醒生产者
return tmp;
}
private:
void lockQueue()
{
pthread_mutex_lock(&_mutex);
}
void unlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
bool isEmpty()
{
return _bq.empty();
}
bool isFull()
{
return _bq.size()==_cap;
}
void proBlockWait()//生产者一定是在临界区中的!
{
//1.在阻塞线程的时候,会自动释放_mutex锁
pthread_cond_wait(&_proCond,&_mutex);
//2.当阻塞结束返回的时候,pthread_cond_wait,会自动帮你重新获得_mutex,然后才返回
}
void conBlockWait()//阻塞等待,等待被唤醒
{
//1.在阻塞线程的时候,会自动释放_mutex锁
pthread_cond_wait(&_conCond,&_mutex);
//2.当阻塞结束返回的时候,pthread_cond_wait,会自动帮你重新获得_mutex,然后才返回
}
void wakeupCon()//唤醒消费者
{
pthread_cond_signal(&_conCond);
}
void wakeupPro()//唤醒生产者
{
pthread_cond_signal(&_proCond);
}
void pushCore(const T& in)
{
_bq.push(in);//生产完成
}
T popCore()
{
T tmp=_bq.front();
_bq.pop();
return tmp;
}
private:
queue<T> _bq;//blockqueue
uint32_t _cap;//容量
pthread_mutex_t _mutex;//保护阻塞队列的互斥锁
pthread_cond_t _conCond;//让消费者等待的条件变量
pthread_cond_t _proCond;//让生产者等待的条件变量
};
Test.cc
void* consumer(void* args)
{
BlockQueue<int>* bqp=static_cast<BlockQueue<int>*>(args);
while(true)
{
//sleep(2);//消费的慢一点
//1.消费数据
int data=bqp->pop();
cout<<"consumer 消费数据完成: "<<data<<endl;
}
}
void* productor(void* args)
{
BlockQueue<int>* bqp=static_cast<BlockQueue<int>*>(args);
while(true)
{
//1.制作数据
int data=rand()%10;
//2.生产数据
bqp->push(data);
cout<<"productor 生产数据完成: "<<data<<endl;
sleep(2);//生产慢一些
}
}
int main()
{
srand((unsigned long)time(nullptr)^getpid());
//定义一个阻塞队列
//创建两个线程,productor,consumer
//productor-----------consumer
BlockQueue<int> bq;
pthread_t c,p;
pthread_create(&c,nullptr,consumer,&bq);
pthread_create(&p,nullptr,productor,&bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
生产的慢一些
消费的慢一些
模拟实现计算任务
//BlockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include<cstdlib>
#include <unistd.h>
#include<pthread.h>
using namespace std;
//新需求:我只想保存最新的5个任务,如果来了任务,老的任务,我想让他直接被丢弃
const uint32_t gDefaultCap=5;
template <class T>
class BlockQueue
{
public:
BlockQueue(uint32_t cap=gDefaultCap)
:_cap(cap)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_conCond,nullptr);
pthread_cond_init(&_proCond,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_conCond);
pthread_cond_destroy(&_proCond);
}
//生产接口
void push(const T& in)
{
//加锁
//判断-->是否适合生产-->bq是否为满-->程序员视角的条件-->1.满(不生产) 2.不满(生产)
//if(满) 不生产,休眠
//else if(不满) 生产,唤醒消费者
//生产
//解锁
lockQueue();
//if(isFull())//未知原因导致函数调用错误,导致条件不满足
while(isFull())
{
//before:当我等待的时候,会自动释放_mutex
proBlockWait();//阻塞等待,等待被唤醒,被唤醒!=条件满足(概率虽然非常小)
//after:当我醒来的时候,我是在临界区里醒来的!!
}
//条件满足,可以生产
pushCore(in);
unlockQueue();
wakeupCon();//唤醒消费者
}
//消费接口
T pop()
{
//加锁
//判断-->是否适合消费-->bq是否为空-->程序员视角的条件-->1.空(不消费) 2.非空(消费)
//if(空) 不消费,休眠
//else if(非空) 消费,唤醒生产者
//消费
//解锁
lockQueue();
//if(isEmpty())//未知原因导致函数调用错误,导致条件不满足
while(isEmpty())
{
conBlockWait();//阻塞等待,等待被唤醒
}
//条件满足,可以消费
T tmp=popCore();
unlockQueue();
wakeupPro();//唤醒生产者
return tmp;
}
private:
void lockQueue()
{
pthread_mutex_lock(&_mutex);
}
void unlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
bool isEmpty()
{
return _bq.empty();
}
bool isFull()
{
return _bq.size()==_cap;
}
void proBlockWait()//生产者一定是在临界区中的!
{
//1.在阻塞线程的时候,会自动释放_mutex锁
pthread_cond_wait(&_proCond,&_mutex);
//2.当阻塞结束返回的时候,pthread_cond_wait,会自动帮你重新获得_mutex,然后才返回
}
void conBlockWait()//阻塞等待,等待被唤醒
{
//1.在阻塞线程的时候,会自动释放_mutex锁
pthread_cond_wait(&_conCond,&_mutex);
//2.当阻塞结束返回的时候,pthread_cond_wait,会自动帮你重新获得_mutex,然后才返回
}
void wakeupCon()//唤醒消费者
{
pthread_cond_signal(&_conCond);
}
void wakeupPro()//唤醒生产者
{
pthread_cond_signal(&_proCond);
}
void pushCore(const T& in)
{
_bq.push(in);//生产完成
}
T popCore()
{
T tmp=_bq.front();
_bq.pop();
return tmp;
}
private:
queue<T> _bq;//blockqueue
uint32_t _cap;//容量
pthread_mutex_t _mutex;//保护阻塞队列的互斥锁
pthread_cond_t _conCond;//让消费者等待的条件变量
pthread_cond_t _proCond;//让生产者等待的条件变量
};
//Task.hpp
#pragma once
#include <iostream>
#include <string>
class Task
{
public:
Task(int one = 0, int two = 0, char op = '+')
: _elemOne(one), _elemTwo(two), _operator(op)
{}
int operator()()
{
return run();
}
int run()
{
int result = 0;
switch (_operator)
{
case '+':
result = _elemOne + _elemTwo;
break;
case '-':
result = _elemOne - _elemTwo;
break;
case '*':
result = _elemOne * _elemTwo;
break;
case '/':
{
if (_elemTwo == 0)
{
std::cout << "div zero,abort" << std::endl;
result = -1;
}
else
{
result = _elemOne / _elemTwo;
}
}
break;
case '%':
{
if (_elemTwo == 0)
{
std::cout << "mod zero,abort" << std::endl;
result = -1;
}
else
{
result = _elemOne % _elemTwo;
}
}
break;
default:
std::cout << "非法操作: " << _operator << std::endl;
break;
}
return result;
}
void get(int& one,int& two,char& op)
{
one=_elemOne;
two=_elemTwo;
op=_operator;
}
private:
int _elemOne;
int _elemTwo;
char _operator;
};
//main.cc
#include "BlockQueue.hpp"
#include"Task.hpp"
#include<ctime>
const std::string ops="+-*/%";
//并发,并不是在临界区中并发(一般),而是在生产前(before blockqueue),消费后(after blockqueue)对应的并发
void* consumer(void* args)
{
BlockQueue<Task>* bqp=static_cast<BlockQueue<Task>*>(args);
while(true)
{
//1.消费任务
Task t=bqp->pop();
//处理任务
int result=t();
int one,two;
char op;
t.get(one,two,op);//输出型参数
cout<<"consumer["<<pthread_self()<<"]"<<(unsigned long)time(nullptr)
<<" 消费了一个任务: "<<one<<op<<two<<"="<<result<<endl;
}
}
void* productor(void* args)
{
BlockQueue<Task>* bqp=static_cast<BlockQueue<Task>*>(args);
while(true)
{
//1.制作任务
int one=rand()%50;
int two=rand()%20;
char op=ops[rand()%ops.size()];
Task t(one,two,op);
//2.生产任务
bqp->push(t);
cout<<"productor["<<pthread_self()<<"]"<<(unsigned long)time(nullptr)
<<" 生产了一个任务: "<<one<<op<<two<<"=?"<<endl;
}
}
int main()
{
srand((unsigned long)time(nullptr)^getpid());
BlockQueue<Task> bq;
pthread_t c,p;
pthread_create(&c,nullptr,consumer,&bq);
pthread_create(&p,nullptr,productor,&bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
三、POSIX信号量
信号量是一个计数器,描述临界资源数量的计数器
计数器-- —> P—>申请资源
计数器++ —> V—>释放资源
信号量如果为1的话就称之为二元信号量!
1.信号量申请成功了,就一定保证你会拥有一部分临界资源!(资源预定机制)
2.临界资源可以当成整体,可以看做一小部分一小部分!(结合场景)
- POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源的目的。但POSIX可以用于线程间同步。
函数
初始化信号量
参数
pshared :0表示线程间共享,非零表示进程间共享
value:信号量的初始值
销毁信号量
等待信号量
功能:等待信号量,会将信号量的值减1
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了,将信号量的值加1
刚才我们写的生产着消费者的例子是基于queue的,起空间可以动态分配,现在是基于固定大小的环形队列重写这个程序(POSIX信号量):
基于环形队列的生产消费模型
- 环形队列采用数组模拟,用模运算来模拟环形特性。
什么时候会发生访问同一个位置呢?
1.我们两个指向同一个位置的时候,只有满or空的时候!(互斥和同步)
其他的时候,都指向不同的位置!(并发)
注意:(后续操作的基本原则)
1.空:消费者不能超过生产者。—>(生产者先运行)
2.满:生产者不能把消费者套一个圈,往后继续写入。—>(消费者先运行)
信号量来保证以上的条件!!!
生产者最关心的是空间资源!!—>N—>P(N) N:[N,0]
消费者最关心的是数据资源!!—>N—>V(N) N:[0,N]
//RingQueue.hpp
#pragma once
#include<iostream>
#include<string>
#include<vector>
#include<semaphore.h>
using namespace std;
const int gCap=5;
template<class T>
class RingQueue
{
public:
RingQueue(int cap=gCap)
:_ringqueue(cap)
,_pIndex(0)
,_cIndex(0)
{
//生产者使用的
sem_init(&_roomSem,0,_ringqueue.size());
//消费者使用的
sem_init(&_dataSem,0,0);
pthread_mutex_init(&_pmutex,nullptr);
pthread_mutex_init(&_cmutex,nullptr);
}
//生产者
void push(const T& in)
{
sem_wait(&_roomSem);
pthread_mutex_lock(&_pmutex);
_ringqueue[_pIndex]=in;//生产的过程
_pIndex++;//写入位置后移
_pIndex%=_ringqueue.size();//更新下标,保证环形特性
pthread_mutex_unlock(&_pmutex);
sem_post(&_dataSem);
}
//消费者
T pop()
{
sem_wait(&_dataSem);
pthread_mutex_lock(&_cmutex);
T tmp=_ringqueue[_cIndex];
_cIndex++;
_cIndex%=_ringqueue.size();
pthread_mutex_unlock(&_cmutex);
sem_post(&_roomSem);
return tmp;
}
~RingQueue()
{
sem_destroy(&_roomSem);
sem_destroy(&_dataSem);
pthread_mutex_destroy(&_pmutex);
pthread_mutex_destroy(&_cmutex);
}
private:
vector<T> _ringqueue;//环形队列
sem_t _roomSem; //衡量空间计数器,productor
sem_t _dataSem; //衡量数据计数器,consumer
int _pIndex; //当前生产者写入的位置,如果是多线程,_pIndex也是临界资源
int _cIndex; //当前消费者读取的位置,如果是多线程,_cIndex也是临界资源
pthread_mutex_t _pmutex;
pthread_mutex_t _cmutex;
};
//main.cc
#include"RingQueue.hpp"
#include<ctime>
#include<unistd.h>
void* productor(void* args)
{
RingQueue<int>* rqp=static_cast<RingQueue<int>*>(args);
while(true)
{
//sleep(2);
int data=rand()%10;
rqp->push(data);
cout<<"pthread["<<pthread_self()<<"] 生产了一个数据: "<<data<<endl;
}
}
void* consumer(void* args)
{
RingQueue<int>* rqp=static_cast<RingQueue<int>*>(args);
while(true)
{
sleep(2);
int data=rqp->pop();
cout<<"pthread["<<pthread_self()<<"] 消费了一个数据: "<<data<<endl;
}
}
int main()
{
srand((unsigned long)time(nullptr)^getpid());
RingQueue<int> rq;
pthread_t c,p;
pthread_create(&p,nullptr,productor,&rq);
pthread_create(&c,nullptr,consumer,&rq);
pthread_join(p,nullptr);
pthread_join(c,nullptr);
return 0;
}
消费线程消费慢一点
生产线程生产的慢一点
总结
(本章完!)