文章目录
- POSIX信号量
- 初始化信号量
- 销毁信号量
- 等待信号量(P操作)
- 发送信号量(V操作)
- 基于环形队列的生产消费模型
- 设计思路
- 代码实现
POSIX信号量
POSIX和System V一样,都是unix下的一套管理方法,下面介绍POSIX标准下的信号量。 POSIX信号量和System
V信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源的目的。除此之外,POSIX信号量可以控制多个线程或者进程对共享资源的访问以达到线程同步的目的。更具体的,信号量是一个资源计数器:当信号量的计数大于0时表示还有资源可以使用,当计数等于0时,表示资源不可以用,需要等待资源。为了便于理解,我们可以认为信号量其实是一个无符号整型,大于0时线程就可以进入临界区访问临界资源并且计数减一,表示可用的资源又少了。直到线程访问资源结束释放资源计数加一。
下面介绍关于操作信号量的函数使用
初始化信号量
#include <semaphore.h>
// 无名信号量初始化
int sem_init(sem_t *sem, int pshared, unsigned int value);
// 命名信号量打开
sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);
对于无名信号量:
sem
:指向信号量对象的指针pshared
:为0表示在线程间共享,否则表示在进程之间共享value
:初始化信号量的值
对于命名信号量:
name
:信号量的名称必须以斜杆/
开头oflag
:用于指定打开信号量的标志O_CREAT
:信号量不在就创建一个O_EXCL
:如果同时指定了O_CREAT并且信号量已存在,则打开失败
mode
:用于设置新创建信号量的权限(类似文件权限)。只有在指定了O_CREAT标志时才使用。权限码和文件的权限码对应value
:初始化信号量的值
销毁信号量
// 销毁无名信号量
int sem_destroy(sem_t *sem);
// 关闭命名信号量
int sem_close(sem_t *sem);
// 取消命名信号量链接
int sem_unlink(const char *name);
sem
表示一个指向信号量对象的指针name
表示命名信号名字
等待信号量(P操作)
等待信号量(也称为P操作)会将信号量的值减一,如果信号量的值为零,调用线程将被阻塞,直到信号量的值大于零
int sem_wait(sem_t *sem);
sem
:指向信号量对象的指针
发送信号量(V操作)
发送信号量(也称为V操作)会将信号量的值加一,如果有任何线程在等待该信号量,它们将被唤醒。
int sem_post(sem_t *sem);
sem
:指向信号量对象的指针
基于环形队列的生产消费模型
之前的生产者消费者模型是基于阻塞队列queue来实现的,其空间可以动态分配,依靠互斥锁和条件变量来实现同步机制。现在基于固定大小的的环形队列使用信号量机制重写这个程序,即实现生产者和消费者的同步访问资源。点击了解环形队列。
同样的,整个模型有1个数据缓冲区即环形队列,2个角色即生产者和消费者,3种关系即生产者和生产者、生产者和消费者、消费者和消费者之间得到关系。
设计思路
下面介绍该模型的设计思路
对于环形队列的基本结构
head
指针:指向下一个将被消费的位置,即数据出队tail
指针:指向下一个被生产的位置,即数据入队
判断队列是否为空和是否为满:
假设整个环形队列要存N个数据,为了方便判断,我们多开一个数据空间不存储数据。如果tail==head,表示初始状态即队列为空。如果 (tail+1)%(N+1)==head 则表示队列为满。为空时,消费者等待,生产者被唤醒。为满时生产者等待,消费者被唤醒。
同步机制:
为了保证多线程环境下的安全操作,需要使用到信号量和互斥锁。大致策略为:设置两种信号量,一种表示可用资源数,一种表示空闲空间数。为什么还要设计一个信号量表示空闲空间数呢?这是为了避免生产者和消费者的忙等待。 此外,在PV操作时需要使用互斥锁。
对于忙等待作出具体解释:
如果设计单信号量表示数据个数,生产者生成数据并尝试插入到缓冲区。如果缓冲区已满,生产者将释放互斥锁并继续检查缓冲区状态,直到有空闲空间。这种反复检查状态的行为就是忙等待。
代码实现
- ringqueue.hpp文件,声明定义了循环队列模板。
#pragma once
#include <iostream>
#include <pthread.h>
#include <vector>
#include <semaphore.h>
using namespace std;
template <class T>
class RingQueue
{
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
public:
RingQueue(const size_t max_cap) // 构造环形队列
: _ringqueue(max_cap), _max_cap(max_cap), _c_step(0), _p_step(0)
{
sem_init(&_data_sem, 0, 0); // 初始化信号量
sem_init(&_emp_sem, 0, _max_cap);
pthread_mutex_init(&_p_mutex, nullptr);
pthread_mutex_init(&_c_mutex, nullptr);
}
void Push(const T &in)
{ // 生产者
P(_emp_sem); // 申请空间
pthread_mutex_lock(&_p_mutex);
_ringqueue[_p_step] = in;
_p_step++;
_p_step = _p_step % _max_cap;
pthread_mutex_unlock(&_p_mutex);
V(_data_sem); // 释放数据
}
void Front(T *out) // 消费者
{
P(_data_sem); // 申请数据
pthread_mutex_lock(&_c_mutex);
*out = _ringqueue[_c_step];
_c_step++;
_c_step = _c_step % _max_cap;
pthread_mutex_unlock(&_c_mutex);
V(_emp_sem); // 释放空间
}
~RingQueue() // 析构销毁资源
{
sem_destroy(&_data_sem);
sem_destroy(&_emp_sem);
pthread_mutex_destroy(&_p_mutex);
pthread_mutex_destroy(&_c_mutex);
}
private:
vector<T> _ringqueue;
int _max_cap;
int _c_step; // 消费者指针的位置
int _p_step; // 生产者指针的位置
sem_t _data_sem; // 表示数据个数
sem_t _emp_sem; // 表示空闲空间个数
pthread_mutex_t _p_mutex;
pthread_mutex_t _c_mutex;
};
- main.cpp 文件,主函数,定义了多个线程,模拟实现多消费者多生产者访问临界区的情况
#include "RingQueue.hpp"
#include <unistd.h>
using namespace std;
void *Consumer(void *arg)
{
RingQueue<int> *rq = static_cast<RingQueue<int> *>(arg);
while (true)
{
int data = 0;
rq->Front(&data);
cout << "生产者获得数据->" << data << endl;
sleep(1);
}
}
void *Produce(void *arg)
{
RingQueue<int> *rq = static_cast<RingQueue<int> *>(arg);
while (true)
{
int data = rand() % 100;
rq->Push(data);
cout << "生产者生产了数据->" << data << endl;
sleep(1);
}
}
int main()
{
pthread_t t1, t2, t3, t4;
srand(time(nullptr));
RingQueue<int> *rq = new RingQueue<int>(10);
pthread_create(&t1, nullptr, Consumer, rq);
pthread_create(&t2, nullptr, Produce, rq);
pthread_create(&t3, nullptr, Consumer, rq);
pthread_create(&t4, nullptr, Produce, rq);
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
pthread_join(t4, nullptr);
return 0;
}
- 运行结果