Linux —— POSIX信号量 - 基于环形队列的生产消费模型
- POSIX信号量
- 信号量的概念
- POSIX信号量的类型
- 信号量的操作
- POSIX信号量函数
- 基于环形队列的生产消费模型
- 设计思路
- 同步和安全性
- 代码
POSIX信号量
信号量的概念
POSIX信号量是一种用于进程和线程之间同步的机制,主要用于控制对共享资源的访问。信号量是一种同步原语,通常表现为一个整数值,表示可用资源的数量。信号量的值不小于0。如果一个进程尝试将信号量的值减少到小于0,操作将被阻塞,知道信号量的值增加到允许的范围。
POSIX信号量的类型
POSIX信号量主要分为两种类型:
- 命名信号量:
- 具有一个名称,可以通过该名称在不同的进程间访问。命名信号量通常用于进程间的同步。
- 使用
sem_open()
函数创建或打开命名信号量
2.未命名信号量: - 不具有名称,通常存在于内存中,适用于同一进程内的多个进程之间的同步。
- 未命名信号量可以通过
sem_init()
函数进行初始化。
信号量的操作
信号量的基本操作包括:
- P操作(等待操作):使用
sem_wait()
函数实现,尝试减少信号量的值。如果信号量的值为0,则调用线程将被阻塞,直到信号量的值大于0。 - V操作(释放操作):使用
sem_post()
函数实现,增加信号量的值,通知其他等待的线程或进程信号量的可用性。
POSIX信号量函数
POSIX信号量提供了一组函数来创建、操作和销毁信号量。以下是一些常用的POSIX信号量函数及其参数和返回值:
- sem_init
功能: 初始化未命名信号量。
原型:
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
sem
: 指向信号量对象的指针。pshared
: 如果为0,则信号量仅用于线程间同步;如果非0,则信号量可用于进程间同步。
value: 信号量的初始值。
返回值:
- 成功时返回0;失败时返回-1,并设置errno以指示错误类型。
- sem_destroy
功能: 销毁信号量。
原型:
int sem_destroy(sem_t *sem);
参数:
- ·sem·: 指向要销毁的信号量对象的指针。
返回值: - 成功时返回0;失败时返回-1,并设置errno。
- sem_wait
功能: 进行 P 操作(等待操作),尝试获取信号量。
原型:
int sem_wait(sem_t *sem);
参数:
sem
: 指向信号量对象的指针。
返回值:
- 成功时返回0;失败时返回-1,并设置errno。如果信号量的值为0,调用线程将被阻塞。
- sem_post
- 功能: 进行V操作(释放操作),增加信号量的值。
- 原型:
int sem_post(sem_t *sem);
参数:
sem
: 指向信号量对象的指针。
返回值:
- 成功时返回0;失败时返回-1,并设置errno。
基于环形队列的生产消费模型
下面通过STL的vector来设计一个环形队列,环形队列采用数组模拟,用模运算来模拟环状特性:
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来
判断满或者空。另外也可以预留一个空的位置,作为满的状态
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
环形队列实际上就是一个链表,只是在理解上理解为一个环形的:
设计思路
- 使用环形队列:
- 环形队列是一种循环使用固定大小内存的数据结构,非常适合生产者-消费者模型。
- 通过维护生产者指针
(_p_step)
和消费者指针(_c_step)
,可以实现对环形队列的环形访问。
- 使用信号量:
- 使用两个信号量:
_data_sem
(数据信号量)和_space_sem(
空间信号量)。 _data_sem
表示队列中可用数据的数量,初始值为0。_space_sem
表示队列中可用空间的数量,初始值为队列的最大容量(_max_cap)
。
- 使用互斥量:
- 使用两个互斥量:
_p_mutex
(生产者锁)和_c_mutex
(消费者锁)。 - 互斥量用于保护生产者和消费者对队列的并发访问。
同步和安全性
- 生产者:
- 生产者首先使用 P
(_space_sem)
申请可用空间,如果没有可用空间,会被阻塞。 - 获取
_p_mutex
锁,保护对队列的写入操作。 - 将数据写入队列,并更新生产者指针
_p_step
。 - 释放
_p_mutex
锁。 - 使用 V
(_data_sem)
发布一个数据可用信号。
- 消费者:
- 消费者首先使用 P
(_data_sem)
申请可用数据,如果没有可用数据,会被阻塞。 - 获取
_c_mutex
锁,保护对队列的读取操作。 - 从队列中读取数据,并更新消费者指针
_c_step
。 - 释放
_c_mutex
锁。 - 使用 V
(_space_sem)
发布一个空间可用信号。
- 同步和安全性:
- 信号量确保了生产者和消费者对队列的访问是同步的。
- 生产者在有可用空间时才能写入,消费者在有可用数据时才能读取。
- 互斥量确保了生产者和消费者对队列的并发访问是安全的。
- 每个线程在访问队列时都会获取相应的互斥量,保证了临界区的互斥访问。
- 其他考虑:
- 在构造函数中初始化信号量和互斥量。
- 在析构函数中销毁信号量和互斥量。
- 确保在异常情况下,信号量和互斥量也能被正确销毁。
代码
- RingQueue.hpp 环形队列的实现
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <pthread.h>
#include <semaphore.h>
template <typename T>
class RingQueue
{
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
public:
RingQueue(int max_cap) : _ringqueue(max_cap), _max_cap(max_cap), _c_step(0), _p_step(0)
{
sem_init(&_data_sem, 0, 0);
sem_init(&_space_sem, 0, max_cap);
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
void Push(const T &in) // 生产者
{
// 信号量本身就是一种资源预约机制,无需判断,即可知道资源的内部情况
P(_space_sem);
pthread_mutex_lock(&_p_mutex);
_ringqueue[_p_step] = in;
_p_step++;
_p_step %= _max_cap;
pthread_mutex_unlock(&_p_mutex);
V(_data_sem);
}
void Pop(T *out) // 消费者
{
P(_data_sem);
pthread_mutex_lock(&_c_mutex);
*out = _ringqueue[_c_step];
_c_step++;
_c_step %= _max_cap;
pthread_mutex_unlock(&_c_mutex);
V(_space_sem);
}
~RingQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
std::vector<T> _ringqueue;
int _max_cap;
int _c_step;
int _p_step;
sem_t _data_sem; // 消费者 数据信号量
sem_t _space_sem; // 生产者 空间信号量
pthread_mutex_t _c_mutex;
pthread_mutex_t _p_mutex;
};
- Task.hpp Task类的实现
#pragma once
#include <iostream>
class Task
{
public:
Task()
{}
Task(int x , int y):_x(x),_y(y)
{
}
~Task()
{}
void Excute()
{
_result = _x + _y;
}
std::string Debug()
{
std::string msg = std::to_string(_x) + " + " + std::to_string(_y) + " = " + " ? ";
return msg;
}
std::string Result()
{
std::string msg = std::to_string(_x) + " + " + std::to_string(_y) + " = " + std::to_string(_result);
return msg;
}
void operator()()
{
Excute();
}
private:
int _x;
int _y;
int _result;
};
- main.cc 代码上层的调用逻辑
#include "RingQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
void *Consumer(void*args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while(true)
{
Task t;
// 1. 消费
rq->Pop(&t);
// 2. 处理数据
t();
std::cout << "Consumer-> " << t.Result() << std::endl;
}
}
void *Productor(void*args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while(true)
{
sleep(1);
// 1. 构造数据
int x = rand() % 10 + 1; //[1, 10]
usleep(x*1000);
int y = rand() % 10 + 1;
Task t(x, y);
// 2. 生产
rq->Push(t);
std::cout << "Productor -> " << t.Debug() << std::endl;
}
}
int main()
{
srand(time(nullptr) ^ getpid());
RingQueue<Task> *rq = new RingQueue<Task>(5);
// 单单
pthread_t c1, c2, p1, p2, p3;
pthread_create(&c1, nullptr, Consumer, rq);
pthread_create(&c2, nullptr, Consumer, rq);
pthread_create(&p1, nullptr, Productor, rq);
pthread_create(&p2, nullptr, Productor, rq);
pthread_create(&p3, nullptr, Productor, rq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
return 0;
}