文章目录
- 信号量
- 信号量的接口
- 初始化
- 销毁
- 等待信号量
- 发布信号量
- 环形队列
- 结合信号量设计模型
- 实现基于环形队列的生产者消费者模型
- Task.hpp
- RingQueue.hpp
- main.cc
- 效果
- 对于多生产多消费的情况
信号量
信号量的本质是一个计数器
首先一份公共资源在实际情况中可能存在不同的线程可以并发访问不同的区域。因此当一个线程想要访问临界资源时,可以先申请信号量,只要信号量申请成功就代表着这个线程在未来一定能访问临界资源的一部分。而这个申请信号量的本质就是对临界资源中特定的小块资源进行预定机制
所有的线程都能看到信号量,也就代表着信号量的本身就是公共资源。如果信号量申请失败,说明该线程并不满足条件变量则线程进入阻塞等待。
信号量的操作可以称为 PV操作,申请信号量成功则信号量的值减1(P),归还资源后信号量的值加1(V)。在这过程中一定要保证原子性
信号量的接口
信号量的类型为:sem_t
初始化
定义一个信号量并初始化:
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsing int value);
参数一为信号量的地址
参数二如果设为0则表示线程间共享,非0则表示进程间共享
参数三为信号量的初始值,也就是这个计数器的最大值
初始化成功返回0
销毁
int sem_destroy(sem_t *sem);
参数为信号量的地址
等待信号量
也就是申请信号量,等待成功则信号量的值减1
int sem_wait(sem_t *sem);
参数为信号量的地址
发布信号量
也就是归还信号量,发布成功则信号量的值加1
int sem_post(sem_t *sem);
参数为信号量的地址
环形队列
将生产者消费者模型放到环形队列里该如何理解呢?
首先环形队列可以理解为就是指一个循环的数组。而对于生产者和消费者而言,生产者负责往这个数组中放入数据,消费者负责从这个数组中拿取数据。
对于生产者而言:只有环形队列中有空的空间时才可以放数据
对于消费者而言:环形队列中必须要有不为空的空间时才能拿数据
那么肯定是生产者放入数据后消费者才能拿数据,因此在唤醒队列中消费者永远都不能超过生产者
假如生产者和消费者都指向了同一块空间时,要么队列满了,要么队列为空
如果队列满了,那么消费者就必须要先运行拿走数据后生产者才可以运行
如果队列为空,那么生产者必须放入数据后消费者才能运行拿数据
如果为其他情况,说明生产者和消费者所指向的空间是不一样的
结合信号量设计模型
那么根据上述的消费者和生产者不同的特性,结合信号量为两者设计条件
对于生产者而言看中的是队列中剩余的空间,那么可以给生产者设置信号量为队列的总容量
对于消费者而言看中的是队列中不为空的空间,则可以给消费者初始的信号量值设为0
那么对于生产过程而言:
- 首先生产者去申请信号量也就是 P 操作
- 申请成功则往下继续生产操作的执行,申请失败则阻塞等待
- 执行完生产操作后,V操作,注意此时的V操作并不是对生产者的信号量操作,而是对消费者的信号量操作。因为此时队列中已经有了不为空的空间,所以消费者的信号量就可以进行加1操作,这样消费者才能申请成功信号量
对于消费过程而言:
- 消费者申请信号量
- 申请成功则往下继续消费操作的执行,失败则阻塞等待
- 执行完消费操作后,V操作,注意此时V操作是对生产者的信号量操作,因为消费完成后,队列里就多了一个空的空间,生产者的信号量就可以进行加1操作
那么对于生产者和消费者的位置其实就是队列中的下标,并且一定是有两个下标,如果队列为空为满,那么两者的位置相同
实现基于环形队列的生产者消费者模型
Task.hpp
#include <iostream>
#include <string>
#include <functional>
#include <cstdio>
// 负责计算的任务类
class CPTask
{
// 调用的计算方法,根据传入的字符参数决定
typedef std::function<int(int, int, char)> func_t;
public:
CPTask()
{
}
CPTask(int x, int y, char op, func_t func)
: _x(x), _y(y), _op(op), _func(func)
{
}
// 实现传入的函数调用
std::string operator()()
{
int count = _func(_x, _y, _op);
// 将结果以自定义的字符串形式返回
char res[2048];
snprintf(res, sizeof res, "%d %c %d = %d", _x, _op, _y, count);
return res;
}
// 显示出当前传入的参数
std::string tostring()
{
char res[1024];
snprintf(res, sizeof res, "%d %c %d = ", _x, _op, _y);
return res;
}
private:
int _x;
int _y;
char _op;
func_t _func;
};
// 负责计算的任务函数
int Math(int x, int y, char c)
{
int count;
switch (c)
{
case '+':
count = x + y;
break;
case '-':
count = x - y;
break;
case '*':
count = x * y;
break;
case '/':
{
if (y == 0)
{
std::cout << "div zero" << std::endl;
count = -1;
}
else
count = x / y;
break;
}
default:
break;
}
return count;
}
class SCTask
{
// 获取保存数据的方法
typedef std::function<void(std::string)> func_t;
public:
SCTask()
{
}
SCTask(const std::string &str, func_t func)
: _str(str), _func(func)
{
}
void operator()()
{
_func(_str);
}
private:
std::string _str;
func_t _func;
};
RingQueue.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <vector>
#include <semaphore.h>
#include <string>
#include <ctime>
#include <unistd.h>
#include <cassert>
using namespace std;
template <class T>
class RingQueue
{
public:
RingQueue(const int size = 10)
: _size(size)
{
// 初始化信号量以及数组大小
// 生产者的信号量初始为数组大小
// 消费者的信号量初始为0
_Rqueue.resize(_size);
assert(sem_init(&_ps, 0, _size) == 0);
assert(sem_init(&_cs, 0, 0) == 0);
}
void push(const T &in)
{
// 生产者申请信号量
assert(sem_wait(&_ps) == 0);
// 为了模拟环形,下标需要模等于数组的大小
// 保证下标不越界
_Rqueue[_Pindex++] = in;
_Pindex %= _size;
// 消费者发布信号量
assert(sem_post(&_cs) == 0);
}
void pop(T *out)
{
// 消费者申请信号量
assert(sem_wait(&_cs) == 0);
// 为了模拟环形,下标需要模等于数组的大小
// 保证下标不越界
*out = _Rqueue[_Cindex++];
_Cindex %= _size;
// 生产者发布信号量
assert(sem_post(&_ps) == 0);
}
~RingQueue()
{
sem_destroy(&_ps);
sem_destroy(&_cs);
}
private:
vector<T> _Rqueue;
int _size; // 数组的容量大小
sem_t _ps; // 生产者的信号量
sem_t _cs; // 消费者的信号量
int _Pindex = 0; // 生产者的下标
int _Cindex = 0; // 消费者的下标
};
main.cc
#include "RingQueue.hpp"
#include "Task.hpp"
void *Pplay(void *rp)
{
RingQueue<CPTask> *rq = (RingQueue<CPTask> *)rp;
while (1)
{
string s("+-*/");
// 随机产生数据插入
int x = rand() % 100 + 1;
int y = rand() % 100 + 1;
// 随机提取+-*/
int i = rand() % s.size();
// 定义好实现类的对象
CPTask c(x, y, s[i], Math);
// 将整个对象插入到计算队列中
rq->push(c);
cout << "生产数据完成: " << c.tostring() << endl;
sleep(1);
}
}
void *Cplay(void *cp)
{
RingQueue<CPTask> *rq = (RingQueue<CPTask> *)cp;
while (1)
{
// 队列拿出数据
CPTask c;
rq->pop(&c);
// 调用拿出的对象获取最终的结果
string s = c();
cout << "消费完成,取出的数据为: " << s << endl;
sleep(2);
}
}
int main()
{
srand(time(nullptr));
RingQueue<CPTask> *rq = new RingQueue<CPTask>();
pthread_t p, c;
pthread_create(&p, nullptr, Pplay, (void *)rq);
pthread_create(&c, nullptr, Cplay, (void *)rq);
pthread_join(p, nullptr);
pthread_join(c, nullptr);
delete rq;
return 0;
}
效果
对于多生产多消费的情况
如果现在有多个生产和多个消费,那么就要保证临界资源的安全,这时候就得加锁。
加锁可以实现在申请信号量之后,因为申请信号量实际上就是一个原子性的操作,并不会因为多线程而导致冲突。当线程申请好了各自的信号量之后再往下运行加锁,就不用让所有线程都等待在加锁前。而解锁同样可以在发布信号量之后。