基于环形队列RingQueue的生产消费模型
- 一、引入
- 二、信号量
- 2.1 信号量概念
- 2.2 信号量PV操作
- 2.3 POSIX信号量接口
- 三、基于环形队列(RingQueue)的生产消费模型
- 3.1 设计思路
- 3.2 结构设计图
- 3.3 单生产单消费代码实现
- 四、多生产多消费情形
- 五、小结
- 5.1 多生产多消费的意义
- 5.2 条件变量与信号量
一、引入
void push(const T &in)
{
pthread_mutex_lock(&mutex_);
while (is_full()) pthread_cond_wait(&pcond_, &mutex_);
q_.push(in);
pthread_cond_signal(&ccond_);
pthread_mutex_unlock(&mutex_);
}
前面说到了基于阻塞队列的生产消费模型中访问临界资源的时候要先去判断是否满足条件;因为我们在操作临界资源的时候,有可能不满足条件,但是我们无法提前知晓,所以只能先加锁再检测,根据检测结果,决定下一步怎么走。
就像上述对资源整体加锁,就默认了对资源整体使用,在其他线程角度,某个线程访问资源就是对整体访问;但是实际情况中可能存在一份公共资源,允许并发的访问不同的区域块!这就不得不提到信号量
二、信号量
2.1 信号量概念
-
信号量本质是一把计数器,衡量临界资源中资源数量多少的计数器
-
只要拥有信号量,未来一定能够拥有临界资源的一部分。申请信号量的本质:对临界资源中特定小块资源的预定机制。
-
信号量是一个计数器加上先申请,这就会令我们预先知道临界资源使用条件,就不需要像阻塞队列一样判断了
线程要进行访问临界资源中的某一区域,先申请信号量,前提是所有线程必须先看到信号量,所以信号量必须是公共资源
2.2 信号量PV操作
信号量:sem_t sem=5;
P操作:sem--
,申请操作,必须保证操作的原子性
V操作:sem++
,归还资源,必须保证操作的原子性
前面我们提到了锁是共享资源,但它的底层结构设计保证了它的原子性操作,对信号量也适用
信号量的核心操作就是PV原语
2.3 POSIX信号量接口
#include <semaphore.h>
//信号量初始化
int sem_init(sem_t *sem, int pshared, unsigned int value)
//sem:信号量
//pshared:0表示线程间共享,非零表示进程间共享。
//value:信号量初始值(资源数目)。
//信号量销毁
int sem_destroy(sem_t *sem)
//信号量等待
int sem_wait(sem_t *sem):p操作,--
//信号量发布
int sem_pos(sem_t *sem):V操作,++
三、基于环形队列(RingQueue)的生产消费模型
关于环形队列可参考我的博客环形队列这里不做过多叙述
3.1 设计思路
这里的判满不需要关心front和rear之间留一个位置
生产者和消费者访问同一个位置的情况:空的时候,满的时候;
其他情况下生产者与消费者访问的就是不同的区域了。
核心的三种条件
- 消费者不能超过生产者(没生产怎么消费?)
- 生产者不能超过消费者一个圈以上(否则数据会覆盖)
- 生产者和消费者指向同一个位置时,如果此时满了就让消费者先走,如果此时为空就让生产者先走
- 对于生产者,需要知道队列中的剩余空间,空间资源定义成一个信号量
- 对于消费者,需要知道队列中的数据资源,数据资源定义成一个信号量
先来预热一下伪代码
虽然不知道生产者消费者那个线程先运行,但是先申请成功的只能是生产者!这里需要注意的是,生产者生产之后任务已经在队列中了,需要对消费者的信号量进行++
,反之亦然!这样就满足了上述的三种条件
3.2 结构设计图
3.3 单生产单消费代码实现
//RingQueue.hpp
#pragma once
#include <iostream>
#include <cassert>
#include <vector>
#include <semaphore.h>
static const int gcap=5;
template<class T>
class RingQueue
{
private:
void P(sem_t& sem)
{
int n=sem_wait(&sem);
assert(n==0);
(void)n;
}
void V(sem_t& sem)
{
int n=sem_post(&sem);
assert(n==0);
(void)n;
}
public:
RingQueue(const int& cap=gcap)
:queue_(cap),cap_(cap)
{
//初始化信号量
int n=sem_init(&spacesem_,0,cap_);
assert(n==0);
n=sem_init(&datasem_,0,0);
assert(n==0);
pStep_=cStep_=0;
}
//生产者
void push(const T& in)
{
P(spacesem_);
queue_[pStep_++]=in;
pStep_%=cap_;//避免越界
V(datasem_);
}
//消费者
void pop(T* out)
{
P(datasem_);
*out=queue_[cStep_++];
cStep_%=cap_;
V(spacesem_);
}
~RingQueue()
{
sem_destroy(&spacesem_);
sem_destroy(&datasem_);
}
private:
std::vector<T> queue_;
int cap_;
sem_t datasem_;//消费者--数据资源
sem_t spacesem_;//生产者--空间资源
int pStep_;//生产者数组下标
int cStep_;//消费者数组下标
};
//Task.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <cstring>
#include <functional>
class Task//计算任务
{
public:
using func_t =std::function<int(int,int,char)>;
Task(){}
Task(int x,int y,char op,func_t callback)
:x_(x),y_(y),op_(op),callback_(callback)
{
}
std::string operator()()
{
int result=callback_(x_,y_,op_);
char buffer[1024];
snprintf(buffer,sizeof buffer,"%d %c %d=%d",x_,op_,y_,result);
return buffer;
}
std::string toTaskString()
{
char buffer[1024];
snprintf(buffer,sizeof buffer,"%d %c %d=?",x_,op_,y_);
return buffer;
}
private:
int x_;
int y_;
char op_;
func_t callback_;
};
const std::string oper = "+-*/%";
int mymath(int x,int y,char op)
{
int result = 0;
switch (op)
{
case '+':
result = x + y;
break;
case '-':
result = x - y;
break;
case '*':
result = x * y;
break;
case '/':
{
if (y == 0)
{
std::cerr << "div zero error!" << std::endl;
result = -1;
}
else
result = x / y;
}
break;
case '%':
{
if (y == 0)
{
std::cerr << "mod zero error!" << std::endl;
result = -1;
}
else
result = x % y;
}
break;
default:
break;
}
return result;
}
//main.cc
#include "RingQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
using std::cout;
using std::endl;
void* ProductorRoutine(void* args)
{
RingQueue<Task>* rq=static_cast<RingQueue<Task>*>(args);
while (true)
{
//任务1
// int data=rand()%10+1;
// rq->push(data);
// cout<<"生产完成"<<data<<endl;
// sleep(1);
//任务2
//获取任务
int x=rand() % 10 ;
int y=rand() % 5;
char op=oper[rand()%oper.size()];
Task t(x,y,op,mymath);
//生产任务
rq->push(t);
//输出提示
cout<<"生产者派发了一个任务: "<<t.toTaskString()<<endl;
sleep(1);
}
}
void* ComsumerRoutine(void* args)
{
RingQueue<Task>* rq=static_cast<RingQueue<Task>*>(args);
while (true)
{
//任务1
// int data;
// rq->pop(&data);
// cout<<"消费完成"<<data<<endl;
//任务2
Task t;
//消费任务
rq->pop(&t);
std::string result=t();
cout<<"生产者派发了一个任务: "<<result<<endl;
}
}
int main()
{
srand((size_t)time(0)^getpid()^0x22113121);
RingQueue<Task>* rq=new RingQueue<Task>();
pthread_t c,p;
pthread_create(&p,nullptr,ProductorRoutine,rq);
pthread_create(&c,nullptr,ComsumerRoutine,rq);
pthread_join(p,nullptr);
pthread_join(c,nullptr);
return 0;
}
代码逻辑
main函数创建两个线程,生产者线程负责生产出任务参数并构建任务对象,再把任务添加到RingQueue里面;消费者线程负责将任务从RingQueue里面取出来并调用Task的仿函数计算任务并打印
四、多生产多消费情形
无论是阻塞队列还是环形队列,只要保证最终进入临界区的是一个生产,一个消费就行,所以需要在环形队列对push与pop加锁。
需要加两把锁,一个是生产线程的锁,一个是消费线程的锁,你拿你的,我拿我的
#include "RingQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
using std::cout;
using std::endl;
std::string SelfName()
{
char name[128];
snprintf(name,sizeof name,"thread[0x%x]",pthread_self());
return name;
}
void* ProductorRoutine(void* args)
{
RingQueue<Task>* rq=static_cast<RingQueue<Task>*>(args);
while (true)
{
//任务1
// int data=rand()%10+1;
// rq->push(data);
// cout<<"生产完成"<<data<<endl;
// sleep(1);
//任务2
//获取任务
int x=rand() % 10 ;
int y=rand() % 5;
char op=oper[rand()%oper.size()];
Task t(x,y,op,mymath);
//生产任务
rq->push(t);
//输出提示
cout<< SelfName()<<",派发了一个任务: "<<t.toTaskString()<<endl;
sleep(1);
}
}
void* ComsumerRoutine(void* args)
{
RingQueue<Task>* rq=static_cast<RingQueue<Task>*>(args);
while (true)
{
//任务1
// int data;
// rq->pop(&data);
// cout<<"消费完成"<<data<<endl;
//任务2
Task t;
//消费任务
rq->pop(&t);
std::string result=t();
cout<< SelfName()<<",消费了一个任务: "<<result<<endl;
}
}
int main()
{
srand((size_t)time(0)^getpid()^0x22113121);
RingQueue<Task>* rq=new RingQueue<Task>();
//多生产多消费
pthread_t p[5],c[10];
for(int i=0;i<5;i++) pthread_create(p+i,nullptr,ProductorRoutine,rq);
for(int i=0;i<10;i++) pthread_create(c+i,nullptr,ComsumerRoutine,rq);
for(int i=0;i<5;i++) pthread_join(p[i],nullptr);
for(int i=0;i<10;i++) pthread_join(c[i],nullptr);
delete rq;
return 0;
}
#pragma once
#include <iostream>
#include <cassert>
#include <vector>
#include <semaphore.h>
static const int gcap=5;
template<class T>
class RingQueue
{
private:
void P(sem_t& sem)
{
int n=sem_wait(&sem);
assert(n==0);
(void)n;
}
void V(sem_t& sem)
{
int n=sem_post(&sem);
assert(n==0);
(void)n;
}
public:
RingQueue(const int& cap=gcap)
:queue_(cap),cap_(cap)
{
//初始化信号量
int n=sem_init(&spacesem_,0,cap_);
assert(n==0);
n=sem_init(&datasem_,0,0);
assert(n==0);
pStep_=cStep_=0;
pthread_mutex_init(&pmutex_,nullptr);
pthread_mutex_init(&cmutex_,nullptr);
}
//生产者
void push(const T& in)
{
P(spacesem_);
//注意!把锁加在信号量里面
pthread_mutex_lock(&pmutex_);
queue_[pStep_++]=in;
pStep_%=cap_;//避免越界
pthread_mutex_unlock(&pmutex_);
V(datasem_);
}
//消费者
void pop(T* out)
{
P(datasem_);
pthread_mutex_lock(&cmutex_);
*out=queue_[cStep_++];
cStep_%=cap_;
pthread_mutex_unlock(&cmutex_);
V(spacesem_);
}
~RingQueue()
{
sem_destroy(&spacesem_);
sem_destroy(&datasem_);
pthread_mutex_destroy(&pmutex_);
pthread_mutex_destroy(&cmutex_);
}
private:
std::vector<T> queue_;
int cap_;
sem_t datasem_;//消费者--数据资源
sem_t spacesem_;//生产者--空间资源
int pStep_;//生产者数组下标
int cStep_;//消费者数组下标
pthread_mutex_t pmutex_;
pthread_mutex_t cmutex_;
};
关于加锁问题,可以在信号量之前加锁也可以再其之后加锁,在其之前加锁就导致线程申请不到锁只能等待,而在其之后加锁可以保证线程先去申请信号量,如果申请失败就在信号量处阻塞,信号量申请成功再去申请锁,提高效率。
五、小结
5.1 多生产多消费的意义
上述多线程代码中,最少有一个线程进入队列,最多有两个线程进入,那么多生产多消费的意义在哪?
不管是环形队列还是阻塞队列,多生产多消费的意义在于获取任务、处理任务是要花时间的,不仅仅是放进去、拿出来就行了。所以多生产多消费的时候的意义在于可以在生产之前与消费之后让线程并行执行。
5.2 条件变量与信号量
条件变量是一种用于线程同步和通信的高级机制。条件变量需要和互斥量一起使用,来保证线程同步和避免竞态条件。
信号量是一种用于多线程间同步的基本机制。它是一个计数器,用于控制多个线程对共享资源的访问。
如果想让一个线程完整的访问共享资源,可以使用条件变量。但是如果需要多个线程并发访问共享资源的不同区域,则可以使用信号量