前言
这是之前基于阻塞队列的生产消费模型中Enqueue的代码
void Enqueue(const T &in) // 生产者用的接口
{
pthread_mutex_lock(&_mutex);
while(IsFull())//判断队列是否已经满了
{
pthread_cond_wait(&_product_cond, &_mutex); //满的时候就在此情况下等待
// 1. pthread_cond_wait调用是: a. 让调用进程等待 b. 自动释放曾经持有的_mutex锁
}
// 进行生产
_bq.push(in);
// 通知消费者来消费
pthread_cond_signal(&_consumer_cond);
pthread_mutex_unlock(&_mutex);
}
缺点:
当一个线程往阻塞队列中插入时,必须要满足一个条件"临界资源还没满",否则就需要放到条件变量的等待队列中去。
而判断临界资源是否为满需要先申请锁(检测临界资源的本质也是在访问临界资源),然后再进入临界区访问临界资源,才能判断临界资源是否为满。
那么只要我们对临界资源整体加锁,就默认会对这个临界资源整体使用(吗?)。实际上可能是:一份临界资源被划分为多个不同的区域,而且运行多个线程同时访问不同的区域。
在访问临界资源之前,我们无法知道临界资源的情况。
多个线程不能同时访问临界资源的不同区域。
1.信号量
1.1信号量的概念
我们之前在学进程间通信时,简单介绍过信号量。
信号量:信号量的本质其实是一计数器,这一计数器的作用是用来描述临界资源中资源数量的多少
申请信号量的本质其实就是:对临界资源中特定的小块资源的预定机制。(资源不一定被我持有,才是我的,只要我预定了,在未来的某个时间,就是我的)
信号量也是一种互斥量,只要申请到信号量的线程,在未来一定能够拥有一份临界资源。
假如要让多个线程同时去访问一块划分为n个区域的临界资源:
创建一个信号量,值为n
每来一个访问临界资源的线程都要先去申请信号量(信号量的值,n--),申请后才能访问
当n被减到0时说明临界资源中各个区域都有线程在访问资源,其它想要访问临界资源的线程就得阻塞等待,等这n个区域中的某个线程访问完将这个区域空出来才行(信号量的值,n++)
信号量解决了上面提到的问题:
线程不用访问临界资源就能知道资源的使用情况 (信号量申请成功就一定有资源可以使用,申请失败则说明条件不满足,只能阻塞等待)
注意:所有线程都得能看到信号量,信号量是一个公共资源,涉及到线程安全问题
信号量的基本操作就是对信号量进行++或--,而这两个操作时原子的
P操作:信号量--,就是在申请资源(此操作必须时原子的)
V操作:信号量++,返回资源(此操作也需是原子的)
1.2信号量的接口
信号量的使用需要引头文件:semaphore.h;还需要链接原生线程库-pthread
sem_t sem;//创建信号量
初始化信号量
man sem_init
参数:
sem:信号量指针
pshared:0表示线程间共享,非0表示进程间共享。(一般情况下为0)
value:信号量初始值,也就是计数器的值
返回值:类型int,成功返回0,失败返回-1,并将 errno 设置为指示错误
申请信号量,P操作,计数器--
man sem_wait
参数
sem:信号量指针
返回值:成功返回0,失败返回-1,设置errno
发布信号量,V操作,计数器++
man sem_post
参数
sem:信号量指针
返回值:成功返回0,失败返回-1,设置errno
信号量销毁
man sem_destroy
参数
sem:信号量指针
返回值:成功返回0,失败返回-1,设置errno
2.基于环形队列的生产者消费者模型
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源的目的,但是POSIX可用于线程间同步
2.1分析
环形队列
这里的环形队列用数组来模拟,取模来模拟其环状特性
当环形队列为空时,头尾都指向同一个位置;当环形队列为满时,头尾也指向同一个位置。这样不好判断为空或为满,可以通过加计数器或者标记位来判断满或空,也可以预留一个空位,作为满状态
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
单生产和单消费两线程在访问环形队列时,生产者负责向环形队列中生产数据,消费者负责从环形队列中消费数据。
那么生产者和消费者什么时候会访问同一个位置呢?当环形队列为空/为满的时候
那么如果环形队列一定不为空&&一定不为满时,生产者、消费者的下标指向不是同一个位置
生产和消费的动作可以真正并发吗?是的
环形队列的生产者消费者模型还需要两个必要条件:
1.生产者不能把消费者超过一个圈以上(当环形队列满了后,生产者继续生产,那么生产者生产的数据会覆盖消费者还未消费的,消费者就无法消费被覆盖的数据了)
2.消费者不能超过生产者(生产者还没生产,消费者无法消费。当消费者超过生产者时,消费者访问的区域无数据)
对于生产者而言,它最关心的是空间
空间资源可以定义一个信号量,用来统计空闲空间的个数
对于消费者而言,它最关心的是数据
数据资源也可以用一个信号量来统计数据个数
所以生产者每次访问临界资源之前,需要先申请空间资源的信号量,申请到才可以进行生产,不然就得老实的阻塞等待
消费者也一样,访问临界资源之前,要先申请数据资源的信号量,申请成功才能够去消费数据,不然还是阻塞等待
空间资源信号量的申请(P)由生产者进行,归还(V)由消费者进行
数据资源信号量的申请(P)由消费者进行,归还(V)由生产者进行
伪代码
生产者
P(room);//申请空间资源
//信号量申请成功,继续向下运行;失败则阻塞
ringbuffer[p_index] = x;
p_index++;
p_index%=10;
V(data);
消费者
P(data);//申请数据资源
//信号量申请成功——数据资源一定存在
out = ringbuffer[c_index];
c_index++;
c_index%=10;\
V(room);
2.2代码
ringqueue.hpp
#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
template<typename T>
class RingQueue
{
private:
void P(sem_t &sem)//申请信号量P操作
{
sem_wait(&sem);
}
void V(sem_t &sem)//发布信号量V操作
{
sem_post(&sem);
}
void Lock(pthread_mutex_t &mutex)//加锁
{
pthread_mutex_lock(&mutex);
}
void UnLock(pthread_mutex_t &mutex)//解锁
{
pthread_mutex_unlock(&mutex);
}
public:
RingQueue(int cap)
:_rq(cap)
,_cap(cap)
,_productor_step(0)
,_consumer_step(0)
{
sem_init(&_room_sem,0,_cap);
sem_init(&_data_sem,0,0);
pthread_mutex_init(&_productor_mutex,nullptr);
pthread_mutex_init(&_consumer_mutex,nullptr);
}
void Enqueue(const T&in)
{
P(_room_sem);
Lock(_productor_mutex);
//开始生产
_rq[_productor_step] = in;
_productor_step %= _cap;//环形队列
UnLock(_productor_mutex);
V(_data_sem);
}
void Pop(T *out)
{
P(_data_sem);
Lock(_consumer_mutex);
//消费
*out = _rq[_consumer_step];
_consumer_step++;
_consumer_step%=_cap;
UnLock(_consumer_mutex);
V(_room_sem);
}
~RingQueue()
{
sem_destroy(&_room_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_productor_mutex);
pthread_mutex_destroy(&_consumer_mutex);
}
private:
std::vector<T> _rq;
int _cap;
int _productor_step;//生产者步数
int _consumer_step;//消费者步数
//定义信号量
sem_t _room_sem;//空间信号量,生产者关心
sem_t _data_sem;//数据信号量,消费者关心
//锁
pthread_mutex_t _productor_mutex;
pthread_mutex_t _consumer_mutex;
};
thread.hpp
#include<iostream>
#include<signal.h>
#include<unistd.h>
#include<functional>
#include<pthread.h>
namespace Thread_Module
{
template <typename T>
using func_t = std::function<void(T&)>;
// typedef std::function<void(const T&)> func_t;
template <typename T>
class Thread
{
public:
void Excute()
{
_func(_data);
}
public:
Thread(func_t<T> func,T &data,const std::string &threadname = "none")
:_threadname(threadname)
,_func(func)
,_data(data)
{}
static void* threadrun(void *args)//线程函数
{
Thread<T> *self = static_cast <Thread<T>*>(args);
self->Excute();
return nullptr;
}
bool Start()//线程启动!
{
int n = pthread_create(&_tid,nullptr,threadrun,this);
if(!n)//返回0说明创建成功
{
_stop = false;//说明线程正常运行
return true;
}
else
{
return false;
}
}
void Stop()
{
_stop = true;
}
void Detach()//线程分离
{
if(!_stop)
{
pthread_detach(_tid);
}
}
void Join()//线程等待
{
if(!_stop)
{
pthread_join(_tid,nullptr);
}
}
std::string threadname()//返回线程名字
{
return _threadname;
}
~Thread()
{}
private:
pthread_t _tid;//线程tid
std::string _threadname;//线程名
T &_data;//数据
func_t<T> _func;//线程函数
bool _stop; //判断线程是否停止 为true(1)停止,为false(0)正常运行
};
}
main.cc
#include "ringqueue.hpp"
#include "Thread.hpp"
#include<string>
#include<vector>
#include<unistd.h>
#include<functional>
#include<pthread.h>
using namespace Thread_Module;
void* consumer(RingQueue<int> &rq)
{
while(true)
{
int data;
rq.Pop(&data);
std::cout<<"消费一个数据:"<<data<<std::endl;
sleep(1);
}
}
void* productor(RingQueue<int> &rq)
{
int a = 1;
while(true)
{
rq.Enqueue(a);
std::cout<<"生产一个数据 :"<<a<<std::endl;
a++;
}
}
void Comm(std::vector<Thread<RingQueue<int>>> *threads,int num,RingQueue<int> &rq,func_t<RingQueue<int>> func)
{
for(int i=0;i<num;i++)
{
std::string name = "thread-"+std::to_string(i+1);
threads->emplace_back(func,rq,name);
}
}
void ProductorStart(std::vector<Thread<RingQueue<int>>> *threads,int num,RingQueue<int> &rq)
{
Comm(threads,num,rq,productor);
}
void ConsumerStart(std::vector<Thread<RingQueue<int>>> *threads,int num,RingQueue<int> &rq)
{
Comm(threads,num,rq,consumer);
}
void StartAll(std::vector<Thread<RingQueue<int>>> &threads)
{
for(auto &thread:threads)
{
std::cout<<"Start:"<<thread.threadname()<<std::endl;
thread.Start();
}
}
void WaitAllThread(std::vector<Thread<RingQueue<int>>> &threads)
{
for(auto &thread:threads)
{
thread.Join();
}
}
int main()
{
RingQueue<int> *rq = new RingQueue<int>(10);
std::vector<Thread<RingQueue<int>>> threads;
ProductorStart(&threads,1,*rq);
ConsumerStart(&threads,2,*rq);
StartAll(threads);
WaitAllThread(threads);
return 0;
}