文章目录
- 前言
- 一、基于环形队列的生产者消费者模型的实现
前言
上一篇文章我们讲了信号量的几个接口和基于环形队列的生产者消费者模型,下面我们就快速来实现。
一、基于环形队列的生产者消费者模型的实现
首先我们创建三个文件,分别是makefile,RingQueue.hpp,以及main.cc。我们先简单搭建一下环形队列的框架:
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
static const int gcap = 5;
template <class T>
class RingQueue
{
public:
RingQueue(const int& cap = gcap)
:_queue(_cap)
,_cap(cap)
{
}
private:
std::vector<T> _queue;
int _cap; //表示环形队列的容量
sem_t _spaceSem; // 代表空间资源的信号量
sem_t _dataSem; // 代表数据资源的信号量
};
首先我们的环形队列为了能存放任意类型的数据,所以直接用了模板参数。然后我们的环形队列的底层是vector,我们用cap这个变量表示环形队列的容量,然后我们之前提到过生产者关心的是空间资源,消费者关心的是数据资源,所以我们有两个信号量来分别表示,使用信号量的头文件是
semaphore.h,然后我们看看信号量初始化的接口:
可以看到信号量初始化的接口有三个参数,我们都在上一篇文章讲过,pshared代表是否要共享(0就是线程间共享,非0就是进程间共享),Value是信号量的初始值。这个接口返回0代表成功,-1说明失败。
所以我们就可以对信号量这样初始化:
RingQueue(const int& cap = gcap)
:_queue(cap)
,_cap(cap)
{
int n = sem_init(&_spaceSem,0,_cap);
if (n==-1)
{
cout<<"_spaceSem信号量异常"<<endl;
}
n = sem_init(&_dataSem,0,0);
if (n==-1)
{
cout<<"_dataSem信号量异常"<<endl;
}
}
我们一开始空间信号量肯定是队列空间容量,因为该开始没有数据占用空间。而开始的时候生产者并没有生产所以数据的信号量为0.
下面我们再写析构函数:
当我们程序退出的时候,要提前释放信号量的资源,所以我们看一下sem_destroy接口:
~RingQueue()
{
int n = sem_destroy(&_spaceSem);
if (n==-1) cout<<"sem_destroy(&_spaceSem)失败"<<endl;
n = sem_destroy(&_dataSem);
if (n==-1) cout<<"sem_destroy(&_dataSem)失败"<<endl;
}
写好了环形队列的框架后,我们再大致写一下main.cc的代码:
#include "RingQueue.hpp"
#include <pthread.h>
void *ProductorRoutine(void* rq)
{
RingQueue<int>* ringq = static_cast<RingQueue<int>*>(rq);
while (true)
{
//
}
}
void *ConsumerRoutine(void* rq)
{
RingQueue<int>* ringq = static_cast<RingQueue<int>*>(rq);
while (true)
{
//
}
}
int main()
{
RingQueue<int>* rq = new RingQueue<int>();
pthread_t p,c;
pthread_create(&p,nullptr,ProductorRoutine,rq);
pthread_create(&c,nullptr,ConsumerRoutine,rq);
pthread_join(p,nullptr);
pthread_join(c,nullptr);
delete rq;
return 0;
}
首先我们创建两个线程,一个代表生产者,一个代表消费者。我们让其线程调用回调函数的时候传入我们写的环形队列,然后结束前让主线程等待两个线程,最后释放掉环形队列的资源。两个回调函数分别是为生产者和消费者服务的,我们用安全的类型转换接收一开始传过来的环形队列指针,然后我们继续编写环形队列的代码。
现在我们需要考虑如何从环形队列中安全的拿出数据,所以我们需要有一个Push接口:
void Push(const T& in)
{
}
void pop(T* out)
{
}
当我们想要写push接口的时候才发现,我们生产的时候还要让信号量++--,所以我们还需要有P操作和V操作的接口:
再写这两个接口前我们还需要认识sem_wait接口:
这个接口可以让我们的信号量-1,对应的是P操作。
sem_post接口可以让信号量+1,代表V操作。
void P(sem_t& sem)
{
int n = sem_wait(&sem);
if (n==-1) cout<<"sem_wait失败"<<endl;
}
void V(sem_t& sem)
{
int n = sem_post(&sem);
if (n==-1) cout<<"sem_wait失败"<<endl;
}
下面我们开始写push和pop接口:
我们每次生产都会有一个位置,所以我们直接用一个变量来表示生产者的位置,再用另一个变量表示消费者的位置。
void Push(const T& in)
{
//生产者生产前需要预定空间
P(_spaceSem);
_queue[productorStep++] = in;
productorStep%=_cap;
//生产完成后数据多了一个
V(_dataSem);
}
我们刚开始生产的时候需要预定空间,所以我们要对空间信号量做P操作,然后在队列中生产者的位置放上生产的数据,这里让productorStep后置++就找到了下一次的位置。然后我们要保证生产者的位置不越界所以每次都要%上队列的容量,然后我们生产完成后数据就变多了,所以需要对数据进行V操作。
void pop(T* out)
{
//消费前需要预定消费资源
P(_dataSem);
*out = _queue[consumerStep++];
consumerStep%=_cap;
//消费后空间变多
V(_spaceSem);
}
消费者消费前同样要看有没有数据资源,如果没有就阻塞在P操作,如果有就将消费的资源给out,然后为了防止消费者的位置越界需要%cap,并且消费者消费完成后空间会多出来。
下面我们完成以下main函数中的代码:
首先我们种一个随机数种子,为了防止数据重复我们^了pid和线程ID,然后我们的任务就是简单的数字:
void *ProductorRoutine(void* rq)
{
RingQueue<int>* ringq = static_cast<RingQueue<int>*>(rq);
while (true)
{
int data = rand()%10 + 1;
ringq->Push(data);
std::cout<<"生产完成,生产的数据是: "<<data<<std::endl;
}
}
void *ConsumerRoutine(void* rq)
{
RingQueue<int>* ringq = static_cast<RingQueue<int>*>(rq);
while (true)
{
int data;
ringq->pop(&data);
std::cout<<"消费完成,消费的数据是: "<<data<<std::endl;
}
}
生产一个随机数然后push到我们的环形队列,然后打印生产完成。消费的时候直接去循环队列那数据然后打印即可,下面我们运行起来:
运行后我们可以看到是没有问题的,当然我们也可以生产前sleep()一下,不然运行太快了。
下面我们将上次写的生产任务的文件复制过来,然后让这个环形队列去完成加减乘除的任务。
下面是任务的代码:
class Task
{
public:
Task()
{
}
Task(int x,int y,char op)
:_x(x)
,_y(y)
,_op(op)
,_result(0)
,_exitCode(0)
{
}
void operator()()
{
switch(_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
if (_y==0)
{
_exitCode = -1;
}
else
{
_result = _x / _y;
}
break;
case '%':
if (_y==0)
{
_exitCode = -2;
}
else
{
_result = _x % _y;
}
break;
default:
break;
}
}
std::string formatArg()
{
return std::to_string(_x) +_op +std::to_string(_y) + "=";
}
std::string formatRes()
{
return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
}
~Task()
{
}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
思路还是和我们阻塞队列的时候一样,直接将环形队列的类型改为Task即可:
注意我们在消费者的函数中调用仿函数()可以直接完成加减乘除的运算。
下面我们运行起来看看效果:
可以看到程序运行是没问题的,每个任务都被完成了。
刚刚我们思考的是单生产单消费模型,下面我们思考一下如果改动代码实现多生产多消费模型呢?
其实很简单,我们的消费者和生产者是完全独立的,也就是说生产者不生产只要队列有数据消费者依旧可以消费。即使消费者不消费,只要队列里有空间,那么生产者就可以生产,既然他们两个不构成互斥关系,那么我们就可以定义两把锁,一把锁去锁生产者,另一把锁去锁消费者。
下面我们就直接在Push和Pop接口中加锁了:
然后我们创建多个线程:
int main()
{
srand((unsigned int)time(nullptr)^getpid()^pthread_self());
RingQueue<Task>* rq = new RingQueue<Task>();
pthread_t p[4],c[8];
for (int i = 0;i<4;i++)
{
pthread_create(p+i,nullptr,ProductorRoutine,rq);
}
for (int i = 0;i<8;i++)
{
pthread_create(c+i,nullptr,ProductorRoutine,rq);
}
for (int i = 0;i<4;i++)
{
pthread_join(p[i],nullptr);
}
for (int i = 0;i<8;i++)
{
pthread_join(c[i],nullptr);
}
delete rq;
return 0;
}
可以看到我们确实完成了多生产多消费的例子。但是我们目前的代码还有一些小问题,实际上我们的信号量本来就是原子的不需要加锁操作,我们正确的写法应该是下面这样:
void Push(const T& in)
{
//生产者生产前需要预定空间
P(_spaceSem);
pthread_mutex_lock(&_pmutex);
_queue[productorStep++] = in;
productorStep%=_cap;
pthread_mutex_unlock(&_pmutex);
//生产完成后数据多了一个
V(_dataSem);
}
void pop(T* out)
{
//消费前需要预定消费资源
P(_dataSem);
pthread_mutex_lock(&_cmutex);
*out = _queue[consumerStep++];
consumerStep%=_cap;
pthread_mutex_unlock(&_cmutex);
//消费后空间变多
V(_spaceSem);
}
以上就是基于环形队列实现生产者消费者模型的全部内容了,这部分的代码很容易写错,一不小心就会出问题,所以大家写代码的时候一定要细心,而且linux环境下不太好调试。