目录
一、信号量
1.1 信号量的概念
1.2 信号量的函数接口
1.2.1 sem_init()
1.2.2 sem_destroy()
1.2.3 sem_wait()(P操作申请信号量)
1.2.4 sem_post()(V操作增加信号量)
二、环形队列
2.1 环形队列概念
2.2 环形队列操作
三、模拟实现
3.1 环形队列在生产消费模型作用
3.2 信号量P&V操作在生产消费模型中作用
3.3 多消费者多生产者需要几把锁?
3.4 CircularQueue.hpp
3.5 Main.cc
一、信号量
1.1 信号量的概念
信号量是一个用于进程间同步和互斥的机制。它是一个整数值,用于表示特定资源的可用性。简单理解,信号量是一种计数器,它的数值代表着资源的个数,取资源信号量--,称之为P操作,放资源信号量++,简称为V操作。
举个生活中的例子,信号量相当于电影院的电影票,资源是座位。通过申请信号量(购买电影票)提前预定资源(座位)。
1.2 信号量的函数接口
头文件:<semaphore.h>
1.2.1 sem_init()
用于初始化信号量变量。函数原型为:
int sem_init(sem_t *sem, int pshared, unsigned int value)。
此函数将未命名的信号量初始化为sem指向的地址,参数value指定了初始值(资源的多少)。其中,pshared参数指示信号量是在进程间共享(pshared非零)还是在线程间共享(pshared为0)。
1.2.2 sem_destroy()
用于销毁(释放)信号量变量。函数原型为:
int sem_destroy(sem_t *sem)
。此函数会销毁sem指向的地址处的未命名信号量。
1.2.3 sem_wait()(P操作申请信号量)
用于“P”操作,即等待信号量值大于0。函数原型为:
int sem_wait(sem_t *sem)。没有申请成功会阻塞等待
1.2.4 sem_post()(V操作增加信号量)
函数原型为:
int sem_post(sem_t *sem);
。此函数会增加信号量sem的值。
二、环形队列
2.1 环形队列概念
环形队列可以是逻辑上是首尾相连队列,实际是一段数组空间!step通过%=数组长度实现环形。
2.2 环形队列操作
环形队列的创建和使用包括以下步骤:
- 创建一个定长的数组,作为环形队列的存储空间。
- 定义两个指针,一个指向队列的首部(front),另一个指向队列的尾部(rear)。
- 当需要入队(加入元素到队列)时,将元素添加到 rear 指针指向的位置,并将 rear 指针向队尾方向移动一位;当 rear 指针到达队列末尾时,将其重置为 0。
- 当需要出队(从队列中移除元素)时,将 front 指针指向的元素移除,并将 front 指针向队尾方向移动一位;当 front 指针到达队列末尾时,将其重置为 0。
三、模拟实现
3.1 环形队列在生产消费模型作用
环形队列中存放资源,2种角色生产者消费者在环形队列中移动。队列的长度是信号量的数组大小。生产者放数据,消费者拿数据!两者只有在最开始或者生产者生产数据过快放满数据,两者才会在同一个位置!其他任何情况,两者不在同一位置,各自执行自己的任务!通过step%=数组长度实现循环!
3.2 信号量P&V操作在生产消费模型中作用
因为信号量操作的原子性,当申请信号量成功--》生产者有空间放数据 & 消费者有数据可拿
否则,二者就要阻塞等待。这样就可以先让两种角色申请信号量,申请成功后在互斥进行具体处理
操作!这样不需要条件变量同步有无资源,因为信号量申请成功就代表了已经获取了资源!这样可
以缩短临界区的线度,提高了线程的执行效率。
还要考虑一个问题,对于消费者,生产者两者都要有一个信号量,生产者的初始信号量是空余空间的大小!消费者的信号量初始是0,代表数据的多少。两者首先对自己的信号量进行P操作申请,然后处理自己的任务后,完成后同步对方的信号量进行V操作。
3.3 多消费者多生产者需要几把锁?
前面的生产者消费者在环形队列下有两个指针,两者不在同一位置情况下,互补干扰!那么多生产者相互竞争一把锁,抢到锁的生产者在自己的位置放数据;同样,消费者一把锁,抢到锁的消费者拿数据,然后再处理自己位置的数据。也就是说生产者们一把锁,消费者们一把锁!
3.4 CircularQueue.hpp
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <assert.h>
template <class T>
class CircularQueue
{
public:
static const int gcap = 10;
CircularQueue(const int cap = gcap) : _cap(cap)
{
sem_init(&_spaceSem, 0, _cap); // 生产者初始空间容量val=_cap
sem_init(&_dataSem, 0, 0); // 消费者初始空间容量 val = 0
pthread_mutex_init(&_pmutex, nullptr);
pthread_mutex_init(&_cmutex, nullptr);
_queue.resize(_cap);
}
void push(const T &in)
{
P(_spaceSem);
// 往后说明有位置放数据
pthread_mutex_lock(&_pmutex);
_queue[_pstep++] = in;
_pstep %= _cap;
pthread_mutex_unlock(&_pmutex);
//同步消费者
V(_dataSem);
}
void pop(T *out)
{
P(_dataSem);
//往后说明有数据处理
pthread_mutex_lock(&_cmutex);
*out = _queue[_cstep++];
_cstep %= _cap;
pthread_mutex_unlock(&_cmutex);
//同步生产者
V(_spaceSem);
}
~CircularQueue()
{
sem_destroy(&_spaceSem);
sem_destroy(&_dataSem);
pthread_mutex_destroy(&_pmutex);
pthread_mutex_destroy(&_cmutex);
}
private:
//sem_wait封装成P操作
void P(sem_t &sem)
{
int n = sem_wait(&sem);
assert(n == 0);
(void)n;
}
//sem_post封装成V操作
void V(sem_t &sem)
{
int n = sem_post(&sem);
assert(n == 0);
(void)n;
}
private:
std::vector<T> _queue;//逻辑上环形队列,物理上数组
int _cap;
sem_t _spaceSem; // 生产者看重空间资源
sem_t _dataSem; // 消费中看重消费资源
int _pstep = 0;
int _cstep = 0;
//生产者们&消费者们各自一把锁
pthread_mutex_t _pmutex;
pthread_mutex_t _cmutex;
};
3.5 Main.cc
#include"CircularQueue.hpp"
#include<unistd.h>
#include<sys/types.h>
#include<pthread.h>
std::string GetName()
{
char name[64];
snprintf(name,sizeof(name),"thread[0x%x]",pthread_self());
return name;
}
//生产者任务就是放入一个整数
void* Producer(void* args)
{
CircularQueue<int>* cq =static_cast<CircularQueue<int>*>(args);
while(true)
{
int task = rand() % 5 + 1;
cq->push(task);
std::cout <<GetName()<<" 生产任务: " <<task<<std::endl;
sleep(1);
}
}
//消费者不做数据处理,只是拿到数据后打印
void* Consumer(void* args)
{
CircularQueue<int>* cq =static_cast<CircularQueue<int>*>(args);
while(true)
{
int ret = 0;
cq->pop(&ret);
std::cout <<GetName()<<" 消费任务: " << ret <<std::endl;
sleep(1);
}
}
int main()
{
srand((unsigned int)time(nullptr) ^ getpid());
CircularQueue<int>* cq = new CircularQueue<int>();
//多生产者,多消费者
pthread_t c[4],p[8];
for(int i =0;i<4;i++)
pthread_create(c+i,nullptr,Producer,cq);
for(int i =0;i<8;i++)
pthread_create(p+i,nullptr,Consumer,cq);
for(int i=0;i<4;i++)
pthread_join(c[i],nullptr);
for(int i =0;i<8;i++)
pthread_join(p[i],nullptr);
delete cq;
return 0;
}
代码结果展示: