目录
一、信号量
1.提出问题
2.信号量的概念
3.信号量的基本操作
(1)PV操作
(2)信号量的使用
二、基于环形队列的生产消费模型
1.环形队列
(1)复习
(2)现象
(3)核心控制原则
2.实现模型
(1)设计环形队列类
(2)实现环形队列类
(3)多生产多消费模型
一、信号量
1.提出问题
还记得之前生产消费模型中阻塞队列类中,生产者向阻塞队列中增加任务的push函数吗?
//阻塞队列类
template<class T>
class Blockqueue
{
public:
//构造函数
//析构函数
//生产数据
void push(const T& data)
{
//下面的判断就开始使用共享资源,需要加锁
pthread_mutex_lock(&_mutx);
//如果当前队列是满的,那就需要将生产者线程加入等待队列挂起
while(Isfull())
{
pthread_cond_wait(&_pcond, &_mutx);
}
_q.push(data);
//唤醒消费者线程消费
pthread_cond_signal(&_ccond);
//解锁
pthread_mutex_unlock(&_mutx);
}
//消费数据
private:
//检测队列是否装满
size_t Isfull() const
{
return (_q.size() == _capcity);
}
std::queue<T> _q;
pthread_mutex_t _mutx;
pthread_cond_t _pcond;
pthread_cond_t _ccond;
size_t _capcity;
};
线程在向阻塞队列中push任务的时候,必须满足阻塞队列不能满的条件,否则就会被放入条件变量的等待队列中。但检测队列是否为满实质是对临界资源的操作,所以在访问前必须先申请锁,
对于申请锁、检测临界资源、生产数据、释放锁的整个流程,线程在申请锁之前是没有办法获知临界资源的状态的。
我们对资源加锁实质上默认线程操作时使用了资源的全部内容,但是实际情况很可能线程只是需要访问临界资源的一部分。
如果线程只是使用临界资源的一部分,那我们不妨将资源划分为多个不同的区域,这样多个线程就可以同时访问不同的区域,程序的效率会再次提高。
所以我们原来实现的模型有以下缺点:
- 访问临界资源前,无法得知临界资源的情况。
- 多个线程不能同时访问临界资源的不同区域。
2.信号量的概念
正是为了解决上述问题,信号量被提出了。
- 信号量本质是一个计数器,数字的大小表示临界资源中资源数量多少。
- 申请信号量本质是对临界资源中特定的小块资源的预定机制。
- 信号量也是一种互斥量,只要申请到信号量的线程,在未来一定能够拥有一份临界资源。
我们将一块临界资源分为4个不同的区域。如果想要让多个线程同时访问这4个不同的区域,就可以创建一个值为4的信号量。
如果线程需要访问临界资源,那它们就都要先申请信号量。如果一个线程申请到了信号量,那信号量的数值就要减一。
当数值减到0的时候,说明临界资源的4个区域都有线程访问。其他想访问临界资源的线程由于申请信号量不成功,就只能阻塞等待。
当某一个线程访问临界资源完毕时,线程归还信号量,信号量的值加一,信号量又会继续被其他线程申请。
线程访问临界资源的哪个区域由程序员决定,但是前提是保证一个区域同一时刻只有一个线程访问。
信号量确实解决了上述问题。
- 第一,线程通过申请信号量就可以知晓临界资源的情况,能申请到证明临界资源准备好了,申请不到证明临界资源有线程访问,所以不用访问临界资源就可以知道资源的使用情况。
- 第二,分区访问允许多线程同时对共享资源访问,有效提升了程序的效率。
3.信号量的基本操作
(1)PV操作
信号量的两个基本操作就是P操作和V操作。
- P操作:信号量值减一(sem–),相当于线程在申请资源,该操作必须是原子的。
- V操作:信号量值加一(sem++),相当于线程在归还资源,该操作也必须是原子的。
信号量和锁一样,只有被所有线程看到才能申请,所以信号量也是公共资源。而信号量的基本操作就是对信号量进行加一和减一,站在线程安全的角度,P操作和V操作必须是原子的。
(2)信号量的使用
首先,信号量也是一个类(sem_t),也可以构造对象(sem_t sem就是一个信号量对象),对象内也有成员函数。
int sem_init(sem_t* sem, int pshared, unsigned int value);
头文件:semaphore.h
功能:初始化信号量。
参数:sem_t* sem表示需要被初始化的信号量的地址。int pshared表示条件变量是否进程间共享,0表示共享,非零表示不共享,一般都直接设为0。unsigned int value表示信号量的初始值,也就是计数器的初始值。
返回值:初始化成功返回0,失败返回-1并将错误码设置进errno。
int sem_destroy(sem_t* sem);
头文件:semaphore.h
功能:销毁信号量。
参数:sem_t* sem表示需要被销毁的信号量的地址。
返回值:初始化成功返回0,失败返回-1并将错误码设置进errno。
int sem_wait(sem_t* sem);
头文件:semaphore.h
功能:申请(等待)信号量,也就是上面的P操作。
参数:sem_t* sem表示需要申请的信号量地址。
返回值:初始化成功返回0,失败返回-1并将错误码设置进errno。
int sem_post(sem_t* sem);
头文件:semaphore.h
功能:归还(发布)信号量,也就是上面的V操作。
参数:sem_t* sem表示需要归还的信号量地址。
返回值:初始化成功返回0,失败返回-1并将错误码设置进errno。
我在这里插一嘴,现在操作系统的进程间通信普遍使用systemV和POSIX两种标准。正是因为信号量和互斥锁都使用了POSIX标准,所以信号量与互斥锁的接口非常类似。
二、基于环形队列的生产消费模型
1.环形队列
(1)复习
对于同样的生产消费模型,我们也可以使用唤醒队列储存任务。
在我们之前数据结构中实现环形队列中有以下原则:
- 环形队列使用数组模拟,储存数据的数量一定。
- 环形队列下标的加减运算需要重载,在达到数组尾时对下标取模。
- 当环形队列为空和为满时,头和尾下标指向同一位置。所以可以使用空一位做标志位,或者增加储存当前元素个数的变量。
(2)现象
在这里我们不分析环形队列的具体实现,而是只宏观查看现象。
现象一:环形队列为空时,生产者和消费者访问同一个位置。
生产者在队尾向队列中生产数据,而消费者在队首从队列中拿取数据。由于环形队列为空,队首和队尾处于同一个位置,所以生产者消费者访问同一位置。(这个位置可以是任意下标处)
现象二:环形队列为满时,生产者和消费者也会访问同一个位置。
当环形队列被填满的时候,生产者访问队尾,消费者访问队首。生产者线程在最后一个空位生产完数据后,跳转到下一个位置准备继续生产。由于队列是环形的,队尾的下一个位置就是队首,所以生产者消费者依旧访问同一位置。
现象三:除去队满和队空,生产者和消费者访问的都是不同的位置。
这个其实和我们之前小学的追及问题没什么区别,只有队列满或者空的时候二者才能追上。
(3)核心控制原则
原则一:消费者不能超过生产者。
消费者消费的必须是生产者生产的数据,当消费者超过生产者后,消费者访问的区域根本没有生产者生产的数据,所以这种情况没有任何意义。
原则二:生产者不能把消费者套圈。
环形队列满了以后,如果生产者继续生产,就会将消费者还没来得及消费的数据覆盖掉,数据也就丢失了。
2.实现模型
(1)设计环形队列类
由于生产者只负责将数据生产到环形队列中,当环形队列满了以后就不能生产了,所以它只在意队尾后还有多少空间供它生产数据。
由于消费者只负责从环形队列中取数据,当环形队列空了以后就不能消费了,所以它只关心队首到队尾有几个数据可以供它消费。
所以我们得到以下结论:
- 生产者在意环形队列中空闲空间可存储数据的个数。
- 消费者在意环形队列中数据的个数。
所以,我们对空间资源定义一个信号量,统计空闲空间储存元素的个数;再对数据资源定义一个信号量,统计数据个数。
生产者在访问临界资源之前,需要先申请空间资源的信号量,申请成功就可以生产数据,否则只能阻塞等待;消费者在访问临界资源之前,需要先申请数据资源的信号量,申请成功就可以消费数据,否则只能阻塞等待。
空间资源信号量的申请由生产者进行,归还(V操作)由消费者进行;数据资源信号量的申请有消费者进行,归还(V操作)由生产者进行。换句话说就是,生产者先申请空间信号量,生产完成则归还资源信号量;消费者先申请资源信号量,消费完成则归还空间信号量。
生产者和消费者访问环形队列是指访问队列的下标位,所以还要添加两个线程的下标位。
通过信号量维护环形队列完全符合我们的设计原则,消费者速度快时,资源信号量全部申请完时。如果生产者没有生产数据,也就没有归还数据资源的信号量,所以消费者会阻塞等待,不会超过生产者。
同样生产者生产速度快时,空间资源信号量全部北申请完。如果消费者没有消费数据,也就没有归还空间资源的信号量,所以生产者会阻塞等待,不会超过套消费者一个圈。
(2)实现环形队列类
包含六个成员变量,PV操作封装,push和pop,构造和析构函数。
template<class T>
class Ringqueue
{
private:
//申请信号量,P操作
void P(sem_t& sem)
{
int n = sem_wait(&sem);
assert(n == 0);
}
//归还信号量,V操作
void V(sem_t& sem)
{
int n = sem_post(&sem);
assert(n == 0);
}
public:
//构造函数
Ringqueue(int capacity = NUM)
:_capacity(capacity)
,_q(capacity)
{
int n = sem_init(&_spacesem, 0, _capacity);//初始化空间信号量
assert(n == 0);//断言初始化成功
n = sem_init(&_datasem, 0, 0);//初始化数据信号量
assert(n == 0);//断言初始化成功
_cindex = 0;
_pindex = 0;//生产者和消费者最初访问下标为0
}
~Ringqueue()
{
int n = sem_destroy(&_spacesem);//销毁空间信号量
assert(n == 0);//断言销毁成功
n = sem_destroy(&_datasem);//销毁数据信号量
assert(n == 0);//断言销毁成功
}
//生产数据
void push(const T& data)
{
P(_spacesem);//申请空间信号量
_q[_pindex++] = data;
_pindex %= _capacity;//生产数据
V(_datasem);//归还数据信号量
}
//消费数据
void pop(T* pdata)
{
P(_datasem);//申请数据信号量
*pdata = _q[_cindex++];
_cindex %= _capacity;//消费数据
V(_spacesem);//归还空间信号量
}
private:
std::vector<T> _q;//环形队列
int _capacity;//队列容量
sem_t _spacesem;//空余空间信号量
sem_t _datasem;//有效数据信号量
int _pindex;//生产者访问的下标
int _cindex;//消费者访问的下标
};
我们将原来阻塞队列的代码复用一下,就实现了一个单生产者和单消费者的基于环形队列的生产消费模型。
ring_queue.h
#include<iostream>
#include<stdlib.h>
#include<time.h>
#include<string>
#include<unistd.h>
#include<functional>
#include<stdio.h>
#include<vector>
#include<assert.h>
#include<semaphore.h>
#define NUM 10
using namespace std;
//计算任务类
class CalTask
{
typedef std::function<int(int,int,char)> func_t;
public:
//默认构造
CalTask()
{}
//构造函数
CalTask(int a, int b, char op, func_t func)
:_a(a)
,_b(b)
,_op(op)
,_func(func)
{}
//仿函数
string operator()()
{
int result = _func(_a, _b, _op);
char buffer[64];
snprintf(buffer, sizeof(buffer), "%d %c %d = %d\n", _a, _op, _b, result);
string s(buffer);
return s;
}
//显示任务
string show_task()
{
char buffer[64];
snprintf(buffer, sizeof(buffer), "%d %c %d = ?\n", _a, _op, _b);
string s(buffer);
return s;
}
private:
func_t _func;
int _a;
int _b;
char _op;
};
template<class T>
class Ringqueue
{
private:
//申请信号量,P操作
void P(sem_t& sem)
{
int n = sem_wait(&sem);
assert(n == 0);
}
//归还信号量,V操作
void V(sem_t& sem)
{
int n = sem_post(&sem);
assert(n == 0);
}
public:
//构造函数
Ringqueue(int capacity = NUM)
:_capacity(capacity)
,_q(capacity)
{
int n = sem_init(&_spacesem, 0, _capacity);//初始化空间信号量
assert(n == 0);//断言初始化成功
n = sem_init(&_datasem, 0, 0);//初始化数据信号量
assert(n == 0);//断言初始化成功
_cindex = 0;
_pindex = 0;//生产者和消费者最初访问下标为0
}
~Ringqueue()
{
int n = sem_destroy(&_spacesem);//销毁空间信号量
assert(n == 0);//断言销毁成功
n = sem_destroy(&_datasem);//销毁数据信号量
assert(n == 0);//断言销毁成功
}
//生产数据
void push(const T& data)
{
P(_spacesem);//申请空间信号量
_q[_pindex++] = data;
_pindex %= _capacity;//生产数据
V(_datasem);//归还数据信号量
}
//消费数据
void pop(T* pdata)
{
P(_datasem);//申请数据信号量
*pdata = _q[_cindex++];
_cindex %= _capacity;//消费数据
V(_spacesem);//归还空间信号量
}
private:
std::vector<T> _q;//环形队列
int _capacity;//队列容量
sem_t _spacesem;//空余空间信号量
sem_t _datasem;//有效数据信号量
int _pindex;//生产者访问的下标
int _cindex;//消费者访问的下标
};
produnce_consume.cc
#include"ring_queue.h"
using namespace std;
//计算器函数
const string ops = "+-*/%";
int calculate(int a, int b, char op)
{
int result = 0;
switch(op)
{
case '+':
result = a + b;
break;
case '-':
result = a - b;
break;
case '*':
result = a * b;
break;
case '/':
{
if(b == 0)
cerr << "除数不能为0\n";
else
result = a / b;
}
break;
case '%':
{
if(b == 0)
cerr << "取模的数字不能为0\n";
else
result = a % b;
}
break;
default:
break;
}
return result;
}
//生产者
void* Produce(void* args)
{
Ringqueue<CalTask>* rq = (Ringqueue<CalTask>*)args;
while(1)
{
sleep(1);
int a = rand()%10;
int b = rand()%10;
int opnum = rand()%ops.size();
CalTask data(a, b, ops[opnum], calculate);
string s = "数据生产完成,需要计算:";
rq->push(data);
s += data.show_task().c_str();
cout << s;
}
return nullptr;
}
//消费者
void* Consume(void* args)
{
Ringqueue<CalTask>* rq = (Ringqueue<CalTask>*)args;
while(1)
{
//sleep(1);
CalTask data;
string s = "数据消费完成,计算结果为:";
rq->pop(&data);
string result = data();
s += result;
cout << s;
}
return nullptr;
}
#define NUM_PRODUCE 1
#define NUM_CONSUME 1
int main()
{
srand((unsigned int)time(nullptr));
Ringqueue<CalTask>* bq = new Ringqueue<CalTask>();
pthread_t ptids[NUM_PRODUCE];
pthread_t ctids[NUM_CONSUME];
//创建多个生产者线程
for(int i = 0; i<NUM_PRODUCE; ++i)
{
pthread_create(&ptids[i], nullptr, Produce, (void*)bq);
}
//创建多个消费者线程
for(int i = 0; i<NUM_CONSUME; ++i)
{
pthread_create(&ctids[i], nullptr, Consume, (void*)bq);
}
//回收所有线程
for(int i = 0; i<NUM_PRODUCE; ++i)
{
pthread_join(ptids[i], nullptr);
}
for(int i = 0; i<NUM_CONSUME; ++i)
{
pthread_join(ctids[i], nullptr);
}
return 0;
}
运行结果:
(3)多生产多消费模型
还是那句话,我们确实用信号量实现了生产者和消费者的互斥与同步,但是对于多线程中不同生产者之间和不同消费者之间的互斥却没有实现。
所以我们设置两把锁,生产者间竞争一把锁,消费者间竞争一把锁。这样就实现了对数据区域的串行访问,注意在环形队列中增加初始化、加锁解锁还有销毁锁的代码。
不过有一个问题,申请信号量和加锁都是访问环形队列前做的事情,所以申请互斥锁和申请信号量谁在前比较合适呢?
第一,申请互斥锁在前,申请信号量在后。
生产者(或消费者)先申请锁,再申请信号量,成功后访问临界资源。那如果一个生产者(或消费者)线程申请到锁后,信号量申请失败了,那线程就只能拿着锁阻塞,其他生产者(或消费者)线程就申请不到锁,整个程序就卡住了。
第二、申请信号量在前,申请互斥锁在后。
生产者(或消费者)先申请信号量,再申请锁,成功后访问临界资源,上面的问题就不会出现。而且对于线程来说,申请锁是有代价的,将信号量申请放在前面还可以减少申请锁的次数,所以申请信号量在互斥锁之前更合适。
ring_queue.h
#include<iostream>
#include<stdlib.h>
#include<time.h>
#include<string>
#include<unistd.h>
#include<functional>
#include<stdio.h>
#include<vector>
#include<assert.h>
#include<pthread.h>
#include<semaphore.h>
#define NUM 10
using namespace std;
//计算任务类
class CalTask
{
typedef std::function<int(int,int,char)> func_t;
public:
//默认构造
CalTask()
{}
//构造函数
CalTask(int a, int b, char op, func_t func)
:_a(a)
,_b(b)
,_op(op)
,_func(func)
{}
//仿函数
string operator()()
{
int result = _func(_a, _b, _op);
char buffer[64];
snprintf(buffer, sizeof(buffer), "%d %c %d = %d\n", _a, _op, _b, result);
string s(buffer);
return s;
}
//显示任务
string show_task()
{
char buffer[64];
snprintf(buffer, sizeof(buffer), "%d %c %d = ?\n", _a, _op, _b);
string s(buffer);
return s;
}
private:
func_t _func;
int _a;
int _b;
char _op;
};
template<class T>
class Ringqueue
{
private:
//申请信号量,P操作
void P(sem_t& sem)
{
int n = sem_wait(&sem);
assert(n == 0);
}
//归还信号量,V操作
void V(sem_t& sem)
{
int n = sem_post(&sem);
assert(n == 0);
}
public:
//构造函数
Ringqueue(int capacity = NUM)
:_capacity(capacity)
,_q(capacity)
{
int n = sem_init(&_spacesem, 0, _capacity);//初始化空间信号量
assert(n == 0);//断言初始化成功
n = sem_init(&_datasem, 0, 0);//初始化数据信号量
assert(n == 0);//断言初始化成功
pthread_mutex_init(&_pmutx, nullptr);
pthread_mutex_init(&_cmutx, nullptr);//构造两把锁
_cindex = 0;
_pindex = 0;//生产者和消费者最初访问下标为0
}
~Ringqueue()
{
int n = sem_destroy(&_spacesem);//销毁空间信号量
assert(n == 0);//断言销毁成功
n = sem_destroy(&_datasem);//销毁数据信号量
assert(n == 0);//断言销毁成功
pthread_mutex_destroy(&_pmutx);
pthread_mutex_destroy(&_cmutx);
//销毁两把锁
}
//生产数据
void push(const T& data)
{
P(_spacesem);//申请空间信号量
pthread_mutex_lock(&_pmutx);//多个生产者竞争这把锁
_q[_pindex++] = data;
_pindex %= _capacity;//生产数据
pthread_mutex_unlock(&_pmutx);//生产者还回这把锁
V(_datasem);//归还数据信号量
}
//消费数据
void pop(T* pdata)
{
P(_datasem);//申请数据信号量
pthread_mutex_lock(&_cmutx);//多个生产者竞争这把锁
*pdata = _q[_cindex++];
_cindex %= _capacity;//消费数据
pthread_mutex_unlock(&_cmutx);//生产者还回这把锁
V(_spacesem);//归还空间信号量
}
private:
std::vector<T> _q;//环形队列
int _capacity;//队列容量
sem_t _spacesem;//空余空间信号量
sem_t _datasem;//有效数据信号量
int _pindex;//生产者访问的下标
int _cindex;//消费者访问的下标
pthread_mutex_t _pmutx;//生产者线程互斥锁
pthread_mutex_t _cmutx;//消费者线程互斥锁
};
produce_consume.cc和上面的一样,只将NUM_PRODUCE和NUM_CONSUME改为大于1的数字就行了,我设置它们都是3。
运行结果: