文章目录
- 信号量概念
- POSIX 信号量
- 基于环形队列的生产者消费者模型
- 基于环形队列的生产者消费者模型编码实现
- 基于环形队列的生产者消费者模型发送任务测试
信号量概念
信号量是一种用于多线程或多进程间同步的机制;
其定义是一个整形变量,本质上信号量可以看成是一个计数器,用来描述临界资源的资源数量;
通过信号量可以使多个执行流进行同步控制;
信号量主要通过P
,V
两个操作用来进行对信号量的控制:
-
P
操作该操作用于减少信号量的值;
当信号量不为
0
时一个执行流申请临界资源即为P
操作;对应信号量将会
-1
;当信号量为
0
时其余执行流试图再次进行P
操作时将会被阻塞,直至其中一个或多个执行流进行了V
操作后信号量不为0
时将会把该阻塞的执行流唤醒; -
V
操作该操作用于增加信号量的值;
当一个执行流对对应的临界资源访问结束时将释放该信号量,即为
V
操作;对应信号量将会
+1
;
其中PV
操作是被设计成具有原子性,确保执行流在进行P
操作或是V
操作时不可被其他执行流打断;
信号量的主要作用为:
-
互斥访问
保护临界资源,确保同一时间只有一个执行流访问临界资源;
-
同步
协调不同执行流的执行顺序;
-
资源统计
管理有限的临界资源数量;
信号量的类型有两种,分别为 “二元信号量” 和 “计数信号量” ;
-
二元信号量
二元信号量的值只能为
0
与1
;当信号量的值为
0
时表示不可访问该临界资源,值为1
时表示允许一个执行流访问临界资源; -
计数信号量
技术信号量可以是任意非负整数;
其中当信号量的值为
0
时表示不可访问该临界资源,值为非负整数且非零时表示允许该数值个数的执行流访问临界资源;
执行流对信号量的申请成功即P
操作不代表该执行流访问了其临界资源,而是表示该执行流存持有对该临界资源访问的许可;
而获得访问该临界资源的许可是访问该临界资源的前提;
-
信号量与临界资源状态
当一个执行流成功进行了
P
操作后不再需要对临界资源状态进行判断;信号量是用来描述临界资源数量的,当信号量不为
0
时即表示该临界资源状态为就绪状态;这意味着申请信号量的本质就是间接判断了临界资源状态是否就绪的过程;
POSIX 信号量
POSIX
标准定义了一个信号量sem
;
该信号量通常包含在<semaphore.h>
头文件中,该信号量一般与 POSIX
互斥锁 pthread
一同使用,故在包含<semaphore.h>
头文件时需包含<pthread.h>
头文件;
-
信号量的定义
POSIX
信号量是一个类型为sem_t
的结构体变量,其对应的结构可能类似于如下:typedef struct { int value; pthread_mutex_t lock; pthread_cond_t cond; // 可能还有其他字段 } sem_t;
在使用该信号量前必须用该类型定义一个
sem_t
类型的变量或对象,如:sem_t sem;
通过一系列接口控制信号量的操作,包括初始化,销毁,
P
操作,V
操作等; -
初始化信号量
通常使用
sem_init()
函数初始化信号量变量;NAME sem_init - initialize an unnamed semaphore SYNOPSIS #include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value); Link with -pthread. RETURN VALUE sem_init() returns 0 on success; on error, -1 is returned, and errno is set to indicate the error.
该函数用于初始化一个未命名的信号量,创建一个可以用于多线程同步的信号量对象;
-
sem_t *sem
该参数为指向要初始化的信号量对象指针;
-
int pshared
该参数指定信号量的共享性质;
如果该参数为
0
则表示信号量在进程间的线程之间共享;如果该参数为非
0
则表示信号量在进程之间共享(非所有系统都支持); -
unsigned int value
该参数用于指定信号量的初始值;
该函数调用成功返回
0
,调用失败返回-1
并设置errno
,通常可能的errno
值为:-
EINVAL
表示
value
超过了信号量的最大允许值; -
ENOSYS
表示系统不支持进程共享的信号量(
pshared
为非0
); -
EPERM
表示没有权限初始化信号量;
-
-
销毁信号量
通常调用
sem_destroy()
函数销毁信号量;NAME sem_destroy - destroy an unnamed semaphore SYNOPSIS #include <semaphore.h> int sem_destroy(sem_t *sem); Link with -pthread. RETURN VALUE sem_destroy() returns 0 on success; on error, -1 is returned, and errno is set to indicate the error.
该函数用于清理和释放无名信号量的资源,通常在程序结束或是不再需要改信号量时调用;
其中参数
sem_t *sem
表示传入一个指向要销毁的信号量的指针;该函数调用成功时返回
0
,调用失败返回-1
并设置errno
来指示错误信息,该函数调用失败可能出现的errno
为:-
EINVAL
表示所传入信号量不是有效的信号量;
-
EBUSY
表示有线程正在等待这个信号量;
该函数只能用于销毁通过
sem_init()
初始化的无名信号量,无法用于销毁命名信号量(sem_open()
创建的信号量);同时若是使用该函数销毁一个正在使用的信号量可能会导致未定义行为;
销毁后的信号量不能再被使用,除非重新调用
sem_init()
初始化; -
-
P
操作通常使用
sem_wait()
函数进行P
操作;NAME sem_wait SYNOPSIS #include <semaphore.h> int sem_wait(sem_t *sem); Link with -pthread. Feature Test Macro Requirements for glibc (see feature_test_macros(7)): sem_timedwait(): _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600 RETURN VALUE All of these functions return 0 on success; on error, the value of the semaphore is left unchanged, -1 is returned, and errno is set to indicate the error.
该函数用于等待信号量,即
P
操作;如果信号量的值大于
0
时则将其减1
并立即返回,如果信号量的值为0
时则阻塞线程,直至信号量大于0
;其中参数
sem_t *sem
表示传入一个指向要等待的信号量的指针;函数调用成功时返回
0
,调用失败时返回-1
并设置errno
,其中常见的错误为:-
EINTR
表示等待被信号处理程序中断;
-
EINVAL
表示传入的信号量
sem
不是有效的信号量;
同样的
P
操作相关的函数有:int sem_trywait(sem_t *sem); int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
-
-
V
操作通常使用
sem_post()
函数进行V
操作;NAME sem_post - unlock a semaphore SYNOPSIS #include <semaphore.h> int sem_post(sem_t *sem); Link with -pthread. DESCRIPTION sem_post() increments (unlocks) the semaphore pointed to by sem. If the semaphore's value consequently becomes greater than zero, then another process or thread blocked in a sem_wait(3) call will be woken up and proceed to lock the semaphore. RETURN VALUE sem_post() returns 0 on success; on error, the value of the semaphore is left unchanged, -1 is returned, and errno is set to indicate the error.
该函数用于释放(解锁)一个信号量,即
V
操作;其中参数
sem_t *sem
表示传入一个指向要释放的信号量的指针;该函数调用成功时返回
0
,调用失败时返回-1
,同时调用失败时其信号量的值保持不变并设置errno
来指示错误原因,该函数可能的错误为:-
EINVAL
表示
sem
不是有效的信号量; -
EOVERFLOW
表示信号量的最大值将被超过;
与
P
操作相同,该操作也是一个原子性操作;如果有多个线程在等待同一个信号量时将只会唤醒其中一个线程;
若是过渡调用该函数可能导致信号量值溢出;
-
基于环形队列的生产者消费者模型
基于环形队列的生产者消费者模型本质与生产者消费者模型无异;
其包含了生产者消费者的几个要素:
-
三种关系
- 消费者与消费者的互斥关系
- 生产者与生产者的互斥关系
- 生产者与消费者的同步互斥关系
-
两个角色
-
生产者
负责生产数据并放入环形队列中,当队列满时生产者需要等待;
-
消费者
负责从环形队列中取出数据并对数据加工处理,当队列为空时,消费者需等待;
-
-
一个交易场所
一块特定结构的共享空间,在该设计中环形队列为消费者与生产者的交易场所;
其中该环形队列是一个固定大小的缓冲区,通常用数组实现,其环形结构用当前生产者或消费者的位置对该固定大小的缓冲区进行取模操作;
头尾指针循环使用形成一个逻辑上的环,该空间用于存储生产者锁生产的数据供消费者消费;
以单生产者单消费者为例,其生产者与消费者必须在环形队列中遵守三条规则:
-
生产者与消费者处于同一位置时无法同时访问
当生产者与消费者处于同一位置时表示环形队列可能处于空状态或是满状态两种状态;
-
队列为空
当队列为空时表示队列中不存在数据,此时消费者必须等待生产者生产数据;
-
队列为满
当队列为满时即生产者无法生产数据,必须阻塞等待至消费者消费一个或多个数据;
-
-
消费者无法超过生产者一圈
消费者超过生产者,在超过之前必定与生产者处于同一位置,由于消费者的位置
>
生产者意味着当前同一位置队列为空;消费者若是超过生产者则表示在队列的空位置处消费数据,将会发生错误;
-
生产者无法超过消费者一圈
生产者在超过消费者之前必定与消费者处于同一位置,此时意味着当前队列状态为满;
若是生产者在此时超过消费者则意味着在已有数据的位置再次放入数据进行覆盖,将会发生问题;
对于生产者而言其关注的资源为当前队列中可存入数据的空间;
对于消费者而言其关注的资源为当前队列中存在多少数据;
因此在使用信号量实现基于环形队列的生产者消费者模型时需要存在两个信号量来分别控制生产者和消费者所关注的资源;
即生产者对应的信号量用来描述当前队列中可存入数据的空间,消费者对应的信号量用来描述当前队列中存在的数据量;
-
多生产者多消费者下的环形队列
在该模型中生产者和生产者必须产生互斥,消费者与消费者也必须产生互斥,所以无论是单生产者单消费者还是多生产者多消费者的情况下,同一时间能够在唤醒队列进行生产消费的只有一个生产者与消费者;
基于环形队列的生产者消费者模型编码实现
该模型与基于阻塞队列的生产者消费者模型类似,存在三种关系,两种角色和一个交易场所;
唯一不同的是基于环形队列的生产者消费者模型利用信号量使得生产者与消费者能够互斥与同步;
/* RingQueue.hpp */
#ifndef RING_QUEUE_HPP
#define RING_QUEUE_HPP
#include <pthread.h>
#include <semaphore.h>
#include <vector>
// 定义一个模板类 RingQueue,可以存储任意类型 T 的数据
template <class T>
class RingQueue {
const static int defaultnum = 10; // 默认队列大小
private:
// 封装信号量的 P 操作(等待)
void P(sem_t &sem) { sem_wait(&sem); }
// 封装信号量的 V 操作(释放)
void V(sem_t &sem) { sem_post(&sem); }
// 封装互斥锁的加锁操作
void Lock(pthread_mutex_t &mutex) { pthread_mutex_lock(&mutex); }
// 封装互斥锁的解锁操作
void Unlock(pthread_mutex_t &mutex) { pthread_mutex_unlock(&mutex); }
public:
// 构造函数,初始化队列和同步原语
RingQueue(int maxcap = defaultnum)
: queue_(maxcap), maxcap_(maxcap), cstep_(0), pstep_(0) {
sem_init(&sem_data_, 0, 0); // 初始化数据信号量,初始值为0
sem_init(&sem_space_, 0, maxcap_); // 初始化空间信号量,初始值为最大容量
pthread_mutex_init(&c_lock_, nullptr); // 初始化消费者互斥锁
pthread_mutex_init(&p_lock_, nullptr); // 初始化生产者互斥锁
}
// 生产者方法:向队列中添加数据
void Push(const T &data) {
P(sem_space_); // 等待有可用空间
Lock(p_lock_); // 加锁,确保生产者之间互斥
queue_[pstep_] = data; // 将数据放入队列
pstep_ = (pstep_ + 1) % maxcap_; // 更新生产者索引,保持环形
V(sem_data_); // 增加可用数据的信号量
Unlock(p_lock_); // 解锁
}
// 消费者方法:从队列中取出数据
void Pop(T *out) {
P(sem_data_); // 等待有可用数据
Lock(c_lock_); // 加锁,确保消费者之间互斥
*out = queue_[cstep_]; // 取出数据
cstep_ = (cstep_ + 1) % maxcap_; // 更新消费者索引,保持环形
Unlock(c_lock_); // 解锁
V(sem_space_); // 增加可用空间的信号量
}
// 析构函数,清理资源
~RingQueue() {
sem_destroy(&sem_data_); // 销毁数据信号量
sem_destroy(&sem_space_); // 销毁空间信号量
pthread_mutex_destroy(&c_lock_); // 销毁消费者互斥锁
pthread_mutex_destroy(&p_lock_); // 销毁生产者互斥锁
}
private:
std::vector<T> queue_; // 存储数据的环形队列
int maxcap_; // 队列最大容量
int cstep_; // 消费者当前位置
int pstep_; // 生产者当前位置
sem_t sem_data_; // 可用数据的信号量
sem_t sem_space_; // 可用空间的信号量
pthread_mutex_t c_lock_; // 消费者互斥锁
pthread_mutex_t p_lock_; // 生产者互斥锁
};
#endif
这是一个环形队列的实现,使用模板设计,可以存储任意类型数据;
使用了信号量和互斥锁来实现线程同步与互斥:
-
sem_t sem_data_
该信号量表示队列中可用数据的数量,为消费者所关注的资源;
-
sem_t sem_space_
该信号量表示队列中可用空间的数量,为生产者所关注的资源;
-
pthread_mutex_t c_lock_
用于保持多消费者情况下消费者之间的互斥关系,确保只有一个消费者线程能够访问其对应的临界资源;
-
pthread_mutex_t p_lock_
用于保持多生产者情况下生产者之间的互斥关系,确保只有一个生产者线程能够访问其对应的临界资源;
并且将信号量的P
操作,V
操作与互斥锁的Lock
操作,Unlock
操作进行封装,以便捷使用;
其中Push()
操作为生产操作,当存在可用空间时生产者对空间信号量进行P
操作生产数据,当数据生产完毕后对资源信号量进行V
操作;
Pop()
操作为消费操作,当存在可消费资源时消费者对资源信号量进行P
操作消费数据,当数据消费完毕后对空间信号量进行V
操作;
其中Push
生产者方法逻辑如下:
- 等待可用空间(对
sem_space_
进行P
操作) - 加锁
Lock(p_lock_)
保证生产者之间保持互斥 - 将数据放入队列并更新生产者索引
p_step_
- 增加可用数据信号量(对
sem_data_
进行V
操作)
Pop
消费者方法逻辑如下:
- 等待可用资源(对
sem_data_
进行P
操作) - 加锁
Lock(c_lock_)
保证消费者之间保持互斥 - 从队列中取出数据并更新消费者索引
c_step_
- 增加可用空间信号量(对
sem_space_
进行V
操作)
其环形逻辑采用cstep_
和pstep_
来跟踪消费者和生产者的位置,并通过取模运算实现逻辑环形;
构造函数初始化所有成员和同步互斥语句(信号量,互斥锁);
析构函数负责清理所有资源(信号量,互斥锁);
以多生产者多消费者为例进行测试(内置类型数据):
/* main.cc */
#include <pthread.h>
#include <unistd.h>
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include "RingQueue.hpp"
using namespace std;
// 生产者线程函数
void *Productor(void *args) {
RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
while (true) {
int data = rand() % 10; // 生成0-9的随机数
usleep(10); // 短暂休眠,模拟生产过程
rq->Push(data); // 将数据放入环形队列
printf("The thread-%3lu production a data :%2d\n", pthread_self() % 1000, data);
}
return nullptr;
}
// 消费者线程函数
void *Consumer(void *args) {
RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
while (true) {
int data = 0;
rq->Pop(&data); // 从环形队列中取出数据
printf("The thread-%3lu get a data :%2d\n", pthread_self() % 1000, data);
usleep(800000); // 休眠0.8秒,模拟消费过程
}
return nullptr;
}
int main() {
srand(time(nullptr)); // 初始化随机数种子
RingQueue<int> *rq = new RingQueue<int>(); // 创建环形队列
pthread_t p_tids[3], c_tids[3]; // 定义3个生产者和3个消费者线程
// 创建3个生产者线程
for (size_t i = 0; i < 3; ++i) {
pthread_create(&p_tids[i], nullptr, Productor, rq);
}
// 创建3个消费者线程
for (size_t i = 0; i < 3; ++i) {
pthread_create(&c_tids[i], nullptr, Consumer, rq);
}
// 等待所有生产者线程结束
for (size_t i = 0; i < 3; ++i) {
pthread_join(p_tids[i], nullptr);
}
// 等待所有消费者线程结束
for (size_t i = 0; i < 3; ++i) {
pthread_join(c_tids[i], nullptr);
}
delete rq; // 释放环形队列内存
return 0;
}
该段代码进行了一个简单的测试,生产者线程通过Productor()
函数循环向队列中放置0-9
的数据并打印对应信息;
消费者线程通过Consumer()
函数每隔0.8
秒循环向队列中取出数据并打印对应信息;
主函数创建了三个生产者线程与三个消费者线程进行这些工作;
代码运行结果为:
$ ./ringqueue
The thread- 96 production a data : 2
The thread-688 get a data : 9
The thread-392 production a data : 9
The thread-800 production a data : 4
The thread-984 get a data : 4
The thread-280 get a data : 2
The thread-800 production a data : 4
The thread- 96 production a data : 4
The thread-392 production a data : 6
...
...
与预期相同;
基于环形队列的生产者消费者模型发送任务测试
生产者消费者模型被设计成类模板,表示其可传输任何类型的数据,包括自定义类型;
假设存在一个任务为Task
:
/* Task.hpp */
#ifndef TASK_HPP
#define TASK_HPP
#include <iostream>
// 定义错误代码枚举
enum { DIV_ERR = 1, MOD_ERR, NONE };
class Task {
public:
Task() {} // 默认构造
// 便于环形生产者消费者模型能够进行默认构造初始化并进行默认拷贝构造
// 构造函数:初始化所有成员变量
Task(int num1, int num2, char oper)
: num1_(num1), num2_(num2), exit_code_(0), result_(0), oper_(oper) {}
// 析构函数(当前为空)
~Task() {}
// 执行任务的主要函数
void run() {
switch (oper_) {
case '+':
result_ = num1_ + num2_;
break;
case '-':
result_ = num1_ - num2_;
break;
case '*':
result_ = num1_ * num2_;
break;
case '/': {
if (num2_ == 0) {
exit_code_ = DIV_ERR; // 设置除零错误
result_ = -1; // 除零时结果设为-1
} else
result_ = num1_ / num2_;
break;
}
case '%': {
if (num2_ == 0) {
exit_code_ = MOD_ERR; // 设置模零错误
result_ = -1; // 模零时结果设为-1
} else
result_ = num1_ % num2_;
break;
}
default:
exit_code_ = NONE; // 未知操作符
break;
}
}
// 重载()运算符,使对象可以像函数一样被调用
void operator()() { run(); }
// 获取计算结果
int getresult() { return result_; }
// 获取退出代码
int getexitcode() { return exit_code_; }
// 获取第一个操作数
int getnum1() { return num1_; }
// 获取第二个操作数
int getnum2() { return num2_; }
// 获取操作符
char getoper() { return oper_; }
private:
int num1_; // 第一个操作数
int num2_; // 第二个操作数
int exit_code_; // 退出代码,用于表示操作是否成功
int result_; // 计算结果
char oper_; // 操作符
};
#endif
其中定义了几个getter
函数获取对应的私有属性;
使用run()
函数进行核心操作,实现了基本的+-*/%
,并将()
运算符重载为该函数;
其中对应的测试代码为如下(单生产者单消费者情况):
/* main.cc */
using namespace std;
// 定义可能的运算符
string opers = "+-*/%";
// 生产者线程函数
void *Productor(void *args) {
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true) {
int data1 = rand() % 10;
usleep(10);
int data2 = rand() % 10;
usleep(10);
char op = opers[rand() % opers.size()];
Task task(data1, data2, op);
rq->Push(task);
printf("The thread-%3lu sent a task : %d %c %d = ?\n",
pthread_self() % 1000, data1, op, data2);
sleep(1);
}
return nullptr;
}
// 消费者线程函数
void *Consumer(void *args) {
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true) {
Task task;
rq->Pop(&task);
task();
printf("The thread-%3lu get a task : %d %c %d = %2d , exitcode: %d\n",
pthread_self() % 1000, task.getnum1(), task.getoper(),
task.getnum2(), task.getresult(), task.getexitcode());
sleep(1);
}
return nullptr;
}
int main() {
srand(time(nullptr)); // 初始化随机数种子
RingQueue<Task> *rq = new RingQueue<Task>(); // 创建任务队列
pthread_t p_tids[1], c_tids[1]; // 定义1个生产者和1个消费者线程
// 创建1个生产者线程
pthread_create(&p_tids[0], nullptr, Productor, rq);
// 创建1个消费者线程
pthread_create(&c_tids[0], nullptr, Consumer, rq);
// 等待生产者线程结束(实际上不会结束)
pthread_join(p_tids[0], nullptr);
// 等待消费者线程结束(实际上不会结束)
pthread_join(c_tids[0], nullptr);
delete rq; // 释放队列内存(实际上不会执行到这里)
return 0;
}
其中Productor()
函数随机生成两个0-9
之间的数字和一个运算符将其构造出一个Task
对象并推入队列中进行生产工作并打印对应生成的任务信息;
Consumer()
函数以默认构造创建一个Task
对象并将该对象取地址调用Pop()
函数取出,并直接调用task()
进行执行任务而后打印任务结果和退出码,每消费一个任务后sleep(1)
;
main()
函数创建一个RingQueue<Task>
对象同时创建一个生产者和一个消费者并等待这两个线程结束;
运行结果为:
$ ./ringqueue
The thread-648 sent a task : 9 * 0 = ?
The thread-944 get a task : 9 * 0 = 0 , exitcode: 0
The thread-648 sent a task : 8 + 3 = ?
The thread-944 get a task : 8 + 3 = 11 , exitcode: 0
The thread-648 sent a task : 5 * 1 = ?
The thread-944 get a task : 5 * 1 = 5 , exitcode: 0
The thread-648 sent a task : 9 * 9 = ?
The thread-944 get a task : 9 * 9 = 81 , exitcode: 0
The thread-648 sent a task : 5 - 8 = ?
The thread-944 get a task : 5 - 8 = -3 , exitcode: 0
The thread-648 sent a task : 3 / 2 = ?
The thread-944 get a task : 3 / 2 = 1 , exitcode: 0
The thread-648 sent a task : 4 / 1 = ?
The thread-944 get a task : 4 / 1 = 4 , exitcode: 0
...
...
印任务结果和退出码,每消费一个任务后sleep(1)
;
main()
函数创建一个RingQueue<Task>
对象同时创建一个生产者和一个消费者并等待这两个线程结束;
运行结果为:
$ ./ringqueue
The thread-648 sent a task : 9 * 0 = ?
The thread-944 get a task : 9 * 0 = 0 , exitcode: 0
The thread-648 sent a task : 8 + 3 = ?
The thread-944 get a task : 8 + 3 = 11 , exitcode: 0
The thread-648 sent a task : 5 * 1 = ?
The thread-944 get a task : 5 * 1 = 5 , exitcode: 0
The thread-648 sent a task : 9 * 9 = ?
The thread-944 get a task : 9 * 9 = 81 , exitcode: 0
The thread-648 sent a task : 5 - 8 = ?
The thread-944 get a task : 5 - 8 = -3 , exitcode: 0
The thread-648 sent a task : 3 / 2 = ?
The thread-944 get a task : 3 / 2 = 1 , exitcode: 0
The thread-648 sent a task : 4 / 1 = ?
The thread-944 get a task : 4 / 1 = 4 , exitcode: 0
...
...
结果与预期相符;