目录
前言
一、信号量
二、信号量的接口
1.初始化
2.销毁
3.申请信号量
4. 释放信号量
三、基于环形队列的生产者消费者模型
1.环形队列的理解
2.生产者消费者的设计
3.单消费者单生产者环形队列的实现
4.多消费者多生产者环形队列的实现
前言
之前,我们学习了线程互斥与线程同步与生产者消费者模型,了解了互斥锁能够很好的保护公共资源,但是之前我们都是将公共资源整体来使用,如果我们可以将公共资源拆分为N份,让线程访问之前先申请信号量,申请到我就将其中一份资源给你,你自己处理,这样操作不会影响到其他线程访问自己的那一块公共资源。提高了效率。
一、信号量
信号量的本质是一把计数器
申请信号量就是预定资源
信号量的PV(申请、释放)操作是原子的
现在我们不将公共资源当成整体,而是将他拆分为好几份,多线程也不再访问临界资源的同一个区域,而是访问自己申请的那一块区域,就能支持多线程并发访问。
比如现在有5份资源,也就是说最理想的状况,能让5个进程一起访问这五份资源,那么我们只需要控制好,不要让第六个线程来访问资源就好。
那么我们知道,信号量的本质就是计数器,并且是原子的,那么我可以让线程先去申请信号量,来判断是否有你能访问的资源,没有就申请失败,有就让你可以访问这块资源。最后访问完释放资源。
同时,当我们申请信号量成功,就证明有一份资源被我预定了,那么我们也无需再次判断该资源是否准备就绪,直接使用即可。
二、信号量的接口
1.初始化
sem_init
- 作用:初始化信号量
- 参数1:sem,需要初始化哪个信号量
- 参数2:pshared,是否需要在进程间共享(0表示线程间共享,非0表示在进程间共享)
- 参数3:value,定义的初始值,代表最多让几个线程进入。
2.销毁
sem_destroy
- 作用:销毁一个信号量
- 参数:sem,代表要销毁哪个信号量。
3.申请信号量
sem_wait
- 作用:申请信号量
- 参数:sem,代表申请哪个信号量。
如果申请失败,会阻塞在wait处。
sem_trywait(非阻塞式申请信号量)
sem_timedwait(根据按照时间进行申请,在时间内申请成功就返回继续执行,超过时间还没申请成功也返回错误代码)
4. 释放信号量
sem_post
- 作用:释放信号量
- 参数:sem,释放的是哪个信号量
三、基于环形队列的生产者消费者模型
1.环形队列的理解
现在我们使用上面的信号量接口,来写一个基于环形队列的生产者消费者模型。
说是一个环形队列,其实本质上就是一个数组,从头放到尾部,如果当n大于了数组的长度,我们就让n%=v.size(),这样n又会回到索引0的位置。就像环一样滚动起来了。
2.生产者消费者的设计
既然是生产者消费者模型,我们就得让生产者去生产数据并往环形队列里面放入,消费者从环形队列中拿取数据。
如果消费者不进行消费,生产者最多能往队列里面生产 n 份数据。也就是消费者被生产者刚好超了一圈。如下生产者生产了一圈,消费者一直没有消费,此时生产者就不能再生产了,因为这会造成数据覆盖,必须让消费者去消费之后再生产。
同理,消费者也最多赶上生产者,就不能再消费了,因为数据已经被消费者消费完了,生产者还没来得及生产。
那么,生产者和消费者只有两种情况下会在同一个位置,要么为空,要么未满。
为空,应该让生产者去生产,为满,应该让消费者去消费。
但是为空或者为满只是占了实际生产消费情况的少部分,更多的情况并没有那么极端。因此我们只需要偶尔进行维持就可以了,这样就能让生产者和消费者同时进行操作了。
对于生产者来讲,空间是资源,对于消费者来讲,数据是资源。
因此我们就需要有两个信号量
- 生产者需要空间资源(sem_space),默认初始化为N,每次生产者生产,就让N-1,每次消费者消费,就让N+1,如果N为0,代表不能再生产,只能消费。
- 消费者需要数据资源(sem_data),默认初始化为0,每次生产者生产,都让数据资源+1,消费者消费,就让数据资源-1,如果数据资源为0,代表只能生产,不能消费。
生产者伪代码:
P(sem_space)//申请空间信号量 sem_space - 1
//进行生产
V(sem_data) //释放数据信号量 sem_data + 1
消费者伪代码:
P(sem_data) //申请数据信号量 sem_data - 1
//进行消费
V(sem_space) //释放空间信号量 sem_space + 1
3.单消费者单生产者环形队列的实现
代码部分并不难,都是上面的逻辑,不多赘述。
RingQueue.hpp
#pragma once
#include<iostream>
#include<vector>
#include<semaphore.h>
using namespace std;
const static int queue_size = 5;
template<class T>
class RingQueue
{
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
public:
RingQueue(int size = queue_size)
:_ringqueue(size)
,_size(size)
,_p_step(0)
,_c_step(0)
{
sem_init(&_space_sem,0,_size);
sem_init(&_data_sem,0,0);
}
void Push(const T& in)
{
P(_space_sem);
_ringqueue[_p_step] = in;
_p_step++;
_p_step %= _size;
V(_data_sem);
}
void Pop(T* out)
{
P(_data_sem);
*out = _ringqueue[_c_step];
_c_step++;
_c_step %= _size;
V(_space_sem);
}
~RingQueue()
{
sem_destroy(&_space_sem);
sem_destroy(&_data_sem);
}
private:
vector<T> _ringqueue;
int _size;
int _p_step;
int _c_step;
sem_t _space_sem;
sem_t _data_sem;
};
Main.cc
#include "RingQueue.hpp"
#include <pthread.h>
#include <unistd.h>
void* Productor(void* args)
{
RingQueue<int>* rq = static_cast<RingQueue<int>*> (args);
int cnt = 100;
while(true)
{
rq->Push(cnt);
cout<<"生产成功,数据为: "<<cnt--<<endl;
}
}
void* Consumer(void* args)
{
RingQueue<int>* rq = static_cast<RingQueue<int>*> (args);
while (true)
{
sleep(1);
int data = 0;
rq->Pop(&data);
cout<<"消费成功,数据为: "<<data<<endl;
}
}
int main()
{
pthread_t p,c;
RingQueue<int> *rq = new RingQueue<int>();
pthread_create(&c,nullptr,Productor,rq);
pthread_create(&p,nullptr,Consumer,rq);
pthread_join(p,nullptr);
pthread_join(c,nullptr);
}
运行结果如下,由于我们让消费者每隔一秒进行消费,因此打印内容为生产满后,消费一个就生产一个。因为我们维持好了信号量,所以不用担心生产和消费会越阶。
4.多消费者多生产者环形队列的实现
现在我们想让有多个消费者和多个生产者一起去操作。虽然我们确实预先申请了信号量,但是环形队列的实现生产有一个位置,消费有一个位置。(环形队列并没有实现拆分公共资源)
他们必须要保持该位置只有同一时间一个线程进入,因此需要添加两把互斥锁来保证消费者的互斥和生产者的互斥。
两把锁的目的是让生产者和消费者一起进行,如果只有一把锁就同一时间只能有一个进行。
我们代码部分也比较简单啊,定义锁,初始化锁,加锁与解锁,销毁锁就可以了。
同时,我们创建好多个消费者和生产者,让他们运行起来。
运行发现,成功进行多生产多消费,速度是要比之前学习的阻塞队列快。因为他能让生产者和消费者同时进行访问,阻塞队列只能要么生产者生产,要么消费者消费。
最后附上总代码
RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
#include "LockGuard.hpp"
using namespace std;
const static int queue_size = 5;
template <class T>
class RingQueue
{
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
public:
RingQueue(int size = queue_size)
: _ringqueue(size), _size(size), _p_step(0), _c_step(0)
{
sem_init(&_space_sem, 0, _size);
sem_init(&_data_sem, 0, 0);
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
void Push(const T &in)
{
P(_space_sem);
// pthread_mutex_lock(&_p_mutex);
{
LockGuard(&_p_mutex);
_ringqueue[_p_step] = in;
_p_step++;
_p_step %= _size;
}
// pthread_mutex_unlock(&_p_mutex);
V(_data_sem);
}
void Pop(T *out)
{
P(_data_sem);
// pthread_mutex_lock(&_c_mutex);
{
LockGuard(&_c_mutex);
*out = _ringqueue[_c_step];
_c_step++;
_c_step %= _size;
}
// pthread_mutex_unlock(&_c_mutex);
V(_space_sem);
}
~RingQueue()
{
sem_destroy(&_space_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_p_mutex);
pthread_mutex_destroy(&_c_mutex);
}
private:
vector<T> _ringqueue;
int _size;
int _p_step;
int _c_step;
sem_t _space_sem;
sem_t _data_sem;
pthread_mutex_t _p_mutex;
pthread_mutex_t _c_mutex;
};
LockGuard.hpp
#pragma once
#include <pthread.h>
// 不定义锁,外部会传递锁
class Mutex
{
public:
Mutex(pthread_mutex_t *lock)
: _lock(lock)
{
}
void Lock()
{
pthread_mutex_lock(_lock);
}
void UnLock()
{
pthread_mutex_unlock(_lock);
}
~Mutex()
{
}
private:
pthread_mutex_t *_lock;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t *lock)
: _mutex(lock)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.UnLock();
}
private:
Mutex _mutex;
};
Main.cc
#include "RingQueue.hpp"
#include <pthread.h>
#include <unistd.h>
void* Productor(void* args)
{
RingQueue<int>* rq = static_cast<RingQueue<int>*> (args);
int cnt = 100;
while(true)
{
rq->Push(cnt);
cout<<"生产成功,数据为: "<<cnt--<<endl;
}
}
void* Consumer(void* args)
{
RingQueue<int>* rq = static_cast<RingQueue<int>*> (args);
while (true)
{
int data = 0;
rq->Pop(&data);
cout<<"消费成功,数据为: "<<data<<endl;
}
}
int main()
{
pthread_t p[2],c[2];
RingQueue<int> *rq = new RingQueue<int>();
pthread_create(&p[0],nullptr,Productor,rq);
pthread_create(&c[0],nullptr,Consumer,rq);
pthread_create(&p[1],nullptr,Productor,rq);
pthread_create(&c[1],nullptr,Consumer,rq);
pthread_join(p[0],nullptr);
pthread_join(c[0],nullptr);
pthread_join(p[1],nullptr);
pthread_join(c[1],nullptr);
}