文章目录
- 1.生产者消费者模型
- 1.1生产者消费者模型的特点
- 1.2生产者消费者模型的原则
- 1.3生产者消费者模型的优点
- 2.基于阻塞队列的生产者消费者模型
- 2.1如何理解生产者消费者模型的并发?
- 3.信号量
- 3.1信号量接口
- 3.2基于环形队列的生产者消费者模型
- 3.3信号量和条件变量的区别
1.生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列;消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
1.1生产者消费者模型的特点
生产者消费者是多线程同步互斥的一个经典场景。有以下的特点:
- 生产者与消费者互斥的情况:生产者和消费者之间存在竞争锁的现象。这就是生产者和消费者的互斥关系。
- 生产者与消费者同步的情况:如果让生产者一直生产,那么当生产者生产的数据将容器塞满后,生产者再生产数据就会生产失败。反之,让消费者一直消费,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。这就是同步现象
- 生产者与生产者之间的关系,消费者与消费者之间的关系(商品缺少的极端情况)都是互斥的。
为什么生产者与消费者、生产者与生产者、消费者与消费者之间都存在互斥关系?
介于生产者和消费者之间的容器可能会被多个执行流同时访问,因此我们需要将该临界资源用互斥锁保护起来。
其中,所有的生产者和消费者都会竞争式的申请锁,因此生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系。
为什么生产者和消费者需要同步关系?
如果让生产者一直生产,那么当生产者生产的数据将容器塞满后,生产者再生产数据就会生产失败。反之,让消费者一直消费,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。这就是同步现象。
1.2生产者消费者模型的原则
以上三种可以总结为生产者与消费者模型的特点:321原则
- 三种关系:生产者与生产者之间有互斥关系,消费者与消费者之间有互斥关系。生产者与消费者之间有互斥同步关系
- 两种角色:生产者线程和消费者线程
- 一个交易场所:可以是链表或者队列
1.3生产者消费者模型的优点
- 解耦
- 支持并发
- 支持忙闲不均
我们在主函数中调用某一函数,那么我们必须等该函数体执行完后才继续执行主函数的后续代码,因此函数调用本质上是一种紧耦合。
在生产者消费者模型中,生产者只负责生产数据并放入队列中,消费者只负责从队列中取走数据,并使用数据。因此生产者消费者模型本质是一种松耦合。
2.基于阻塞队列的生产者消费者模型
因为可能同时存在多个生产者和消费者,生产者生产的商品放入队列的尾部,消费者从队列的头部取走商品,因此需要两把锁对队列头和尾进行保护。
同时当队列不为空时,需要通知消费者进行消费,当队列为空时,需要通知生产者进行生产,因此需要两个条件变量分别通知生产者和消费者。
基本框架
template <class T>
class Blockqueue{
public:
void push(){//生产
//加锁
//判断队列是否满
if(队列满了){//不生产,等待唤醒
}
else{//不满,生产
}
//解锁
}
void pop(){//消费
//加锁
//判断是否为空
if(为空){//不消费,等待唤醒
}
else{//消费
}
//解锁
}
private:
queue<T> bq_; //阻塞队列
uint32_t cap_; //容量
pthread_mutex_t mutex_head; //保护阻塞队列头部的互斥量
pthread_mutex_t mutex_tail; //保护阻塞队列尾部的互斥量
pthread_cond_t concond_; //让消费者等待的条件变量
pthread_cond_t procond_; //让生产者等待的条件
}
blockqueue.hpp
阻塞队列模板的实现。
#include <iostream>
#include <queue>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
using namespace std;
const uint32_t defaultcap=15;
template <class T>
class Blockqueue{
public:
Blockqueue(uint32_t cap=defaultcap):cap_(cap)
{
pthread_cond_init(&concond_,nullptr);
pthread_cond_init(&procond_,nullptr);
pthread_mutex_init(&mutex_head,nullptr);
pthread_mutex_init(&mutex_tail,nullptr);
}
~Blockqueue(){
pthread_cond_destroy(&concond_);
pthread_cond_destroy(&procond_);
pthread_mutex_destroy(&mutex_head);
pthread_mutex_destroy(&mutex_tail);
}
void push(const T &in){//生产
//加锁
//判断队列是否满
//队列满了,不生产,等待唤醒
//队列不满,生产
//解锁
lockqueue(&mutex_tail);
while(isfull()){ //如果满了,等待被唤醒生产
prowaitblock();
}
pushCore(in);
unlockqueue(&mutex_tail);
//唤醒消费者
wakecon();
}
T pop(){//消费
//加锁
//判断是否为空
//不消费,等待唤醒
//消费
//解锁
lockqueue(&mutex_head);
while(isempty()){
conwaitblock();
}
T tmp=popcore();
unlockqueue(&mutex_head);
wakepro();
return tmp;
}
private:
void prowaitblock(){
pthread_cond_wait(&procond_,&mutex_tail);
}
void conwaitblock(){
pthread_cond_wait(&concond_,&mutex_head);
}
void wakecon(){
pthread_cond_signal(&concond_);
}
void wakepro(){
pthread_cond_signal(&procond_);
}
void lockqueue(pthread_mutex_t* mutexq){
pthread_mutex_unlock(mutexq);
}
void unlockqueue(pthread_mutex_t* mutexq){
pthread_mutex_unlock(mutexq);
}
bool isfull(){
return bq_.size()==cap_;
}
bool isempty(){
return bq_.empty();
}
void pushCore(const T &in){
bq_.push(in);
}
T popcore(){
T tmp=bq_.front();
bq_.pop();
return tmp;
}
private:
queue<T> bq_; //阻塞队列
uint32_t cap_; //容量
pthread_mutex_t mutex_head; //保护阻塞队列头部的互斥量
pthread_mutex_t mutex_tail;
pthread_cond_t concond_; //让消费者等待的条件变量
pthread_cond_t procond_; //让生产者等待的条件
};
task.hpp
阻塞队列存放的任务类
#pragma once
#include <iostream>
#include <string>
class Task
{
public:
Task() : elemOne_(0), elemTwo_(0), operator_('0'){
}
Task(int one, int two, char op) : elemOne_(one), elemTwo_(two), operator_(op){
}
int operator() (){
return run();
}
int run(){
int result = 0;
switch (operator_)
{
case '+':
result = elemOne_ + elemTwo_;
break;
case '-':
result = elemOne_ - elemTwo_;
break;
case '*':
result = elemOne_ * elemTwo_;
break;
case '/':
{
if (elemTwo_ == 0){
std::cout << "div zero, abort" << std::endl;
result = -1;
}
else{
result = elemOne_ / elemTwo_;
}
}
break;
case '%':
{
if (elemTwo_ == 0){
std::cout << "mod zero, abort" << std::endl;
result = -1;
}
else{
result = elemOne_ % elemTwo_;
}
}
break;
default:
std::cout << "非法操作: " << operator_ << std::endl;
break;
}
return result;
}
int get(int *e1, int *e2, char *op){
*e1 = elemOne_;
*e2 = elemTwo_;
*op = operator_;
}
private:
int elemOne_;
int elemTwo_;
char operator_;
};
测试程序
#include "blockqueue.hpp"
#include "task.hpp"
#include <ctime>
const std::string ops = "+-*/%";
void* consumer_run(void* arg){
Blockqueue<Task>* blq=(Blockqueue<Task>*)arg;
while(true){ //循环获取任务,并执行
Task t=blq->pop();
int result=t();
int em1,em2;
char op;
t.get(&em1,&em2,&op);
cout << "consumer[" << pthread_self() << "] " << (unsigned long)time(nullptr)
<< " 消费了一个任务: " << em1 << op << em2 << "=" << result << endl;
}
return nullptr;
}
void* productor_run(void* arg){
Blockqueue<Task>*blq=(Blockqueue<Task>*)arg;
while (true)
{
int em1=rand()%100,em2=rand()%30;
char op=ops[rand()%4];
Task t(em1,em2,op);
blq->push(t);
cout << "producter[" << pthread_self() << "] " << (unsigned long)time(nullptr)
<< " 生产了一个任务: " << em1 << op << em2 << "=?" << endl;
sleep(1);
}
return nullptr;
}
int main(){
srand((unsigned long)time(nullptr) ^ getpid());
Blockqueue<Task> blq; //创建一个阻塞队列
pthread_t c1,c2,c3,c4,p1,p2,p3,p4,p5;
pthread_create(&c1,nullptr,consumer_run,&blq);
pthread_create(&c2,nullptr,consumer_run,&blq);
pthread_create(&c3,nullptr,consumer_run,&blq);
pthread_create(&c4,nullptr,consumer_run,&blq);
pthread_create(&p1,nullptr,productor_run,&blq);
pthread_create(&p2,nullptr,productor_run,&blq);
pthread_create(&p3,nullptr,productor_run,&blq);
pthread_create(&p4,nullptr,productor_run,&blq);
pthread_create(&p5,nullptr,productor_run,&blq);
//回收线程
pthread_join(c1,nullptr);
pthread_join(c2,nullptr);
pthread_join(c3,nullptr);
pthread_join(c4,nullptr);
pthread_join(p1,nullptr);
pthread_join(p2,nullptr);
pthread_join(p3,nullptr);
pthread_join(p4,nullptr);
pthread_join(p5,nullptr);
return 0;
}
程序执行的结果
2.1如何理解生产者消费者模型的并发?
并发不是在临界区的并发,由于临界区存在锁(只有一个线程可以访问资源),因此不可能做到并发。
这里的并发是指:生产前(制造任务要花时间)和消费后(执行任务需要时间)的并发。
- 多个生产者在同时生产任务;只不过在将任务放入队列的时候,同一时刻只有一个线程可以放入任务。
- 多个消费者在同时执行任务;只不过在将任务取出的时候,同一时刻只有一个线程可以取出任务。
3.信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步 。
- 临界资源其实不一定只能一个线程同时访问的,如果我们的临界资源是可以看做多份的情况下,可能做到让多个线程同时访问的;只要访问的是不同的资源就可。
- 任何线程,如果想访问临界资源中的某一个,一定必须先申请信号量,使用完毕,必须释放信号量
- 申请信号量表示可以使用临界资源中的某一个
- 信号量本质是一种资源的预定机制。
信号量是一种特殊的变量,它只能取自然数值并且只支持两种操作:等待(wait)和信号(signal)。但是在Linux/UNIX中,“等待”和“信号”都已经具有特殊的含义。所以我们用的称呼是P、V操作。
- P操作(申请资源):如果信号量的值大于0,就减1。如果值为0,则挂起等待
- V操作(发布资源):如果有其他进程因为等待SV而挂起,则唤醒之;如果没有,则将小信号量的值加1。
互斥量与信号量的关系?
- 二元信号量就是互斥量。互斥量的值只能取0或者1。
- 当信号量的初值为1,且在0和1之间变化时,成为二元信号量。其与互斥锁的区别在于:互斥锁有拥有者这一概念,信号量则没有。互斥锁由同一线程加放锁,信号量可以由不同线程进行PV操作。
- 计数信号量允许多个线程,且值为剩余可用资源数量。互斥锁保证多个线程对一个共享资源的互斥访问,信号量用于协调多个线程对一系列资源的访问
3.1信号量接口
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数说明:
- pshared:0表示线程间共享,非零表示进程间共享
- value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1。当信号量为0时,挂起等待被唤醒
int sem_wait(sem_t *sem);
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);
3.2基于环形队列的生产者消费者模型
- 环形队列采用数组模拟,用模运算来模拟环状特性
- 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来,这里可以直接使用信号量代替计数器
- 判断满或者空。另外也可以预留一个空的位置,作为满的状态
ringqueue.hpp
环形队列的实现
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <semaphore.h>
using namespace std;
const int gcap = 10;
template <class T>
class Ringqueue{
public:
Ringqueue():ringqueue_(gcap),pIndex_(0),cIndex_(0)
{
pthread_mutex_init(&pmutex_,nullptr);
pthread_mutex_init(&cmutex_,nullptr);
sem_init(&roomsem_,0,ringqueue_.size());
sem_init(&datasem_,0,0);
}
~Ringqueue(){
pthread_mutex_destroy(&pmutex_);
pthread_mutex_destroy(&cmutex_);
sem_destroy(&roomsem_);
sem_destroy(&datasem_);
}
void push(const T& t){ //生产
sem_wait(&roomsem_);
pthread_mutex_lock(&pmutex_);
ringqueue_[pIndex_]=t;
pIndex_++;
pIndex_%=ringqueue_.size();
pthread_mutex_unlock(&pmutex_);
sem_post(&datasem_);
}
T pop(){
sem_wait(&datasem_);
pthread_mutex_lock(&cmutex_);
T tmp=ringqueue_[cIndex_];
cIndex_++;
cIndex_%=ringqueue_.size();
pthread_mutex_unlock(&cmutex_);
sem_post(&roomsem_);
return tmp;
}
private:
vector<T> ringqueue_; //阻塞队列
uint32_t pIndex_; // 当前生产者写入的位置, 如果是多线程,pIndex_也是临界资源
uint32_t cIndex_; // 当前消费者读取的位置,如果是多线程,cIndex_也是临界资源
pthread_mutex_t pmutex_;
pthread_mutex_t cmutex_;
sem_t roomsem_;
sem_t datasem_;
};
接口的测试
#include "ringqueue.hpp"
#include <ctime>
#include <unistd.h>
#include"task.hpp"
const std::string ops = "+-*/%";
void *productor(void *args)
{
Ringqueue<int> *rqp = static_cast<Ringqueue<int> *>(args);
while(true)
{
int data = rand()%10;
rqp->push(data);
cout << "pthread[" << pthread_self() << "]" << " 生产了一个数据: " << data << endl;
sleep(1);
}
}
void *consumer(void *args)
{
Ringqueue<int> *rqp = static_cast<Ringqueue<int> *>(args);
while(true)
{
//sleep(10);
int data = rqp->pop();
cout << "pthread[" << pthread_self() << "]" << " 消费了一个数据: " << data << endl;
}
}
int main()
{
srand((unsigned long)time(nullptr)^getpid());
Ringqueue<int> rq;
pthread_t c1,c2,c3, p1,p2,p3;
pthread_create(&p1, nullptr, productor, &rq);
pthread_create(&p2, nullptr, productor, &rq);
pthread_create(&p3, nullptr, productor, &rq);
pthread_create(&c1, nullptr, consumer, &rq);
pthread_create(&c2, nullptr, consumer, &rq);
pthread_create(&c3, nullptr, consumer, &rq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
return 0;
}
执行结果:
3.3信号量和条件变量的区别
信号量利用条件变量、互斥锁、计数器实现,计数器就是信号量的核心,信号量是条件变量的高级抽象
区别:
- (1)使用条件变量可以一次唤醒所有等待者(广播),而这个信号量没有的功能。
- (2)信号量是有一个值(状态的),而条件变量是没有的,没有地方记录唤醒(发送信号)过多少次,也没有地方记录唤醒线程(wait返回)过多少次。从实现上来说一个信号量可以是用mutex + counter + condition variable实现的。