文章目录
- 一、解释信号量
- 1.1 概念的引入
- 1.2 信号量操作和使用接口
- 二、信号量使用场景
- 2.1 引入环形队列&&生产消费问题
- 2.2 代码实现
- 2.3 对于多生产多消费的情况
- 2.4 申请信号量和加锁的顺序问题
- 2.5 多生产多消费的意义
一、解释信号量
1.1 概念的引入
我们知道,一个线程在访问临界资源时,临界资源必须要是满足条件的。但是,在线程访问资源前,无法得知这块资源是否满足生产或消费的条件。所以,线程只能先对这块资源加锁,然后检测其是否满足条件,再进行操作,最后再释放锁。可是,检测的过程本质上也是在访问临界资源。
只要一个线程对一块资源加了锁,就默认该线程对这个资源的整体使用。
但实际情况中可能存在,一份公共资源是允许多个线程同时访问其中的不同区域的。所以,在这种情况下,一个线程要访问资源,就必须先申请信号量。
信号量的本质是一把衡量临界资源中资源数量多少的计数器,拥有信号量就意味着,在未来一定能够拥有临界资源的一部分。申请信号量的本质是对临界资源中特定某一部分资源的预定机制。
所以,有了信号量,就意味着在访问临界资源之前,就可以知道临界资源的使用情况。换言之,如果申请信号量成功,就说明临界资源中一定有可以访问的资源;失败说明不满足条件,必须进行等待。所以,申请信号量成功与否,就能说明是否可以访问临界资源。这样也就不需要先进行判断了。
1.2 信号量操作和使用接口
首先,线程要访问临界资源中的某一部分,就必须先申请信号量。也就是说,信号量要能够被所有线程看到,即信号量本身是公共资源。
而因为信号量是衡量资源中资源数量多少的计数器,所以当线程访问资源的时候,它必须进行–操作;当线程归还资源的时候,它必须进行++操作。而为了保证++、–的过程不会被其他线程打断,就必须保证操作的原子性。其中,信号量–的操作叫做P操作,++的操作叫做V操作。而信号量的核心操作就是PV操作。
信号量基本使用接口如下:
①初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
②销毁信号量
int sem_destroy(sem_t *sem);
③等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
④发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
二、信号量使用场景
2.1 引入环形队列&&生产消费问题
经过前面的铺垫,想必大家已经对信号量和互斥锁适合使用的场景有了大致的轮廓。
互斥锁更适用于一整块的临界资源,而信号量更适用于看似是一块临界资源,但其实是可以分成一个个小部分的资源块的资源。
所以,这里引入一个符合条件的适用于信号量的存储资源的结构----环形队列。
环形队列采用数组模拟,用模运算来模拟环状特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。(这里采用计数器的方式,也就是使用信号量)
具体的细节实现就不解释了,相信大家学到这个程度已经都熟稔于心了。
而这里,我们让生产者和消费者都访问这个环形队列,生产者向队列中写入数据,而消费者从队列中读取数据(相当于把数据弹出),该过程中二者应该是并发的。
写代码之前,需要知道环形队列为空和为满的时候,生产者和消费者是在同一个位置的,其他情况下都不在同一位置。
更重要的“游戏规则”是,消费者在队列中的位置一定不能超过生产者(未生产不能消费),生产者不能将消费者“套圈”(队列满了就不能再放入)。而队列为空时,生产者先访问队列,为满时,消费者先访问队列。
所以,只有队列为空和为满的时候,生产者消费者才存在同步和互斥的问题!
对于生产者来说,看中的时队列中的剩余空间;对于消费者而言,看中的是放入队列中的数据。所以,在实现代码时,我们应该定义两个信号量,分别用来维护空间资源和数据资源。
2.2 代码实现
首先还是老规矩,定义一个环形队列的类,文件为RingQueue.hpp,内容如下:
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <cassert>
static const int gcap=5;
template<class T>
class RingQueue
{
private:
void P(sem_t& sem)
{
int n=sem_wait(&sem);
assert(n==0);
(void)n;
}
void V(sem_t& sem)
{
int n=sem_post(&sem);
assert(n==0);
(void)n;
}
public:
RingQueue(const int& cap=gcap):_queue(cap),_cap(cap)
{
int n=sem_init(&_spaceSem,0,_cap);
assert(n==0);
n=sem_init(&_dataSem,0,0);
assert(n==0);
_productorStep=_consumerStep=0;
}
void Push(const T& in)
{
P(_spaceSem);//申请空间信号量成功就一定能进行生产
_queue[_productorStep++]=in;
_productorStep%=_cap;
V(_dataSem);
}
void Pop(T* out)
{
P(_dataSem);
*out=_queue[_consumerStep++];
_consumerStep%=_cap;
V(_spaceSem);
}
~RingQueue()
{
sem_destroy(&_spaceSem);
sem_destroy(&_dataSem);
}
private:
std::vector<T> _queue;
int _cap;//队列容量
sem_t _spaceSem;//生产者看重的空间资源信号量
sem_t _dataSem;//消费者看重的数据资源信号量
int _productorStep;
int _consumerStep;
};
然后,在Main.cc中就可以用这个类来完成生产者和消费者各自的任务了,内容如下:
#include "RingQueue.hpp"
#include <pthread.h>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
void* ProductorRoutine(void* rq)
{
RingQueue<int>* ringqueue=static_cast<RingQueue<int>*>(rq);
while(true)
{
sleep(2);
int data=rand()%10+1;
ringqueue->Push(data);
std::cout<<"生产完成,生产数据: "<<data<<std::endl;
}
}
void* ConsumerRoutine(void* rq)
{
RingQueue<int>* ringqueue=static_cast<RingQueue<int>*>(rq);
while(true)
{
int data;
ringqueue->Pop(&data);
std::cout<<"消费完成,消费数据: "<<data<<std::endl;
}
}
int main()
{
srand((unsigned int)time(nullptr)^getpid()^pthread_self());
RingQueue<int>* rq=new RingQueue<int>();
pthread_t c,p;
pthread_create(&p,nullptr,ProductorRoutine,rq);
pthread_create(&c,nullptr,ConsumerRoutine,rq);
pthread_join(p,nullptr);
pthread_join(c,nullptr);
delete rq;
return 0;
}
Makefile内容如下:
ringqueue:Main.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f ringqueue
需要注意的是,上面代码中设置的生产者每一次生产之前都要休眠两秒,而对消费者不做处理。所以代码执行结果一定是生产者每生产一次,消费者就能立刻消费。
运行结果如下:
[sny@VM-8-12-centos circlequeue]$ ./ringqueue
生产完成,生产数据: 7
消费完成,消费数据: 7
生产完成,生产数据: 7
消费完成,消费数据: 7
生产完成,生产数据: 8
消费完成,消费数据: 8
^C
可见,结果和预测相同。
除此之外,我们也可以用在之前的文章中封装过的任务派发类,来给生产者派发任务,而让消费者处理任务。
新建Task.hpp文件内容如下:
#pragma once
#include <iostream>
#include <functional>
#include <cstdio>
class Task
{
using func_t =std::function<int(int,int,char)>;
public:
Task()
{}
Task(int x,int y,char op,func_t func):_x(x),_y(y),_op(op),_callback(func)
{}
std::string operator()()
{
int result=_callback(_x,_y,_op);
char buffer[1024];
snprintf(buffer,sizeof buffer,"%d %c %d = %d",_x,_op,_y,result);
return buffer;
}
std::string toTaskString()
{
char buffer[1024];
snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y);
return buffer;
}
private:
int _x;
int _y;
func_t _callback;
char _op;
};
const std::string oper="+-*/%";
int mymath(int x,int y,char op)
{
int result=0;
switch(op)
{
case '+':
result= x+y;
break;
case '-':
result= x-y;
break;
case '*':
result= x*y;
break;
case '/':
if(y==0)
{
std::cerr<<"div zero error!"<<std::endl;
result=-1;
}
else
result=x/y;
break;
case '%':
if(y==0)
{
std::cerr<<"mod zero error!"<<std::endl;
result=-1;
}
else
result=x%y;
break;
default:
break;
}
return result;
}
对Main.cc内容稍作修改,如下:
#include "RingQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
void* ProductorRoutine(void* rq)
{
RingQueue<Task>* ringqueue=static_cast<RingQueue<Task>*>(rq);
while(true)
{
sleep(2);
//获取任务
int x=rand()%1000;
int y=rand()%1500;
char op=oper[rand()%oper.size()];
Task t(x,y,op,mymath);
//生产任务
ringqueue->Push(t);
std::cout<<"生产者派发任务: "<<t.toTaskString()<<std::endl;
}
}
void* ConsumerRoutine(void* rq)
{
RingQueue<Task>* ringqueue=static_cast<RingQueue<Task>*>(rq);
while(true)
{
Task t;
//消费任务
ringqueue->Pop(&t);
std::string result=t();
std::cout<<"消费者消费任务: "<<result<<std::endl;
}
}
int main()
{
srand((unsigned int)time(nullptr)^getpid()^pthread_self());
RingQueue<Task>* rq=new RingQueue<Task>();
pthread_t c,p;
pthread_create(&p,nullptr,ProductorRoutine,rq);
pthread_create(&c,nullptr,ConsumerRoutine,rq);
pthread_join(p,nullptr);
pthread_join(c,nullptr);
delete rq;
return 0;
}
运行结果如下:
[sny@VM-8-12-centos circlequeue]$ ./ringqueue
生产者派发任务: 912 % 178 = ?
消费者消费任务: 912 % 178 = 22
生产者派发任务: 282 * 951 = ?
消费者消费任务: 282 * 951 = 268182
生产者派发任务: 658 % 173 = ?
消费者消费任务: 658 % 173 = 139
^C
2.3 对于多生产多消费的情况
上面的代码中实现的很明显是单生产单消费的情况,那么如果有多个生产者和多个消费者又该如何实现呢?
要知道的是,不管有多少个生产者和消费者,一次只能有一个生产者和一个消费者访问环形队列。所以,应该让生产者和消费者之间决出一个竞争能力较强的线程,进而又去执行单生产单消费的任务。由于生产者和生产者之间、消费者和消费者之间是互斥的关系,所以一定要有两把锁分别控制生产者和消费者。
所以,再对代码做出修改。
Main.cc内容如下:
std::string ThreadName()
{
char name[128];
snprintf(name,sizeof(name),"thread[0x%x]",pthread_self());
return name;
}
void* ProductorRoutine(void* rq)
{
RingQueue<Task>* ringqueue=static_cast<RingQueue<Task>*>(rq);
while(true)
{
sleep(2);
//获取任务
int x=rand()%1000;
int y=rand()%1500;
char op=oper[rand()%oper.size()];
Task t(x,y,op,mymath);
//生产任务
ringqueue->Push(t);
std::cout<<ThreadName()<<",生产者派发任务: "<<t.toTaskString()<<std::endl;
}
}
void* ConsumerRoutine(void* rq)
{
RingQueue<Task>* ringqueue=static_cast<RingQueue<Task>*>(rq);
while(true)
{
Task t;
//消费任务
ringqueue->Pop(&t);
std::string result=t();
std::cout<<ThreadName()<<",消费者消费任务: "<<result<<std::endl;
}
}
int main()
{
srand((unsigned int)time(nullptr)^getpid()^pthread_self());
RingQueue<Task>* rq=new RingQueue<Task>();
pthread_t c[8],p[4];
for(int i=0;i<4;i++)
{
pthread_create(p+i,nullptr,ProductorRoutine,rq);
}
for(int i=0;i<8;i++)
{
pthread_create(c+i,nullptr,ConsumerRoutine,rq);
}
for(int i=0;i<4;i++)
{
pthread_join(p[i],nullptr);
}
for(int i=0;i<8;i++)
{
pthread_join(c[i],nullptr);
}
delete rq;
return 0;
}
RingQueue.hpp内容如下:
template<class T>
class RingQueue
{
private:
void P(sem_t& sem)
{
int n=sem_wait(&sem);
assert(n==0);
(void)n;
}
void V(sem_t& sem)
{
int n=sem_post(&sem);
assert(n==0);
(void)n;
}
public:
RingQueue(const int& cap=gcap):_queue(cap),_cap(cap)
{
int n=sem_init(&_spaceSem,0,_cap);
assert(n==0);
n=sem_init(&_dataSem,0,0);
assert(n==0);
_productorStep=_consumerStep=0;
pthread_mutex_init(&_pmutex,nullptr);
pthread_mutex_init(&_cmutex,nullptr);
}
void Push(const T& in)
{
pthread_mutex_lock(&_pmutex);
P(_spaceSem);//申请空间信号量成功就一定能进行生产
_queue[_productorStep++]=in;
_productorStep%=_cap;
V(_dataSem);
pthread_mutex_unlock(&_pmutex);
}
void Pop(T* out)
{
pthread_mutex_lock(&_cmutex);
P(_dataSem);
*out=_queue[_consumerStep++];
_consumerStep%=_cap;
V(_spaceSem);
pthread_mutex_unlock(&_cmutex);
}
~RingQueue()
{
sem_destroy(&_spaceSem);
sem_destroy(&_dataSem);
pthread_mutex_destroy(&_pmutex);
pthread_mutex_destroy(&_cmutex);
}
private:
std::vector<T> _queue;
int _cap;//队列容量
sem_t _spaceSem;//生产者看重的空间资源信号量
sem_t _dataSem;//消费者看重的数据资源信号量
int _productorStep;
int _consumerStep;
pthread_mutex_t _pmutex;
pthread_mutex_t _cmutex;
};
执行结果如下:
[sny@VM-8-12-centos circlequeue]$ ./ringqueue
thread[0xbaaaa700],生产者派发任务: 730 * 1478 = ?
thread[0xb8aa6700],消费者消费任务: 881 / 481 = 1
thread[0xb82a5700],消费者消费任务: 2 + 874 = 876
thread[0xba2a9700],生产者派发任务: 334 / 1437 = ?
thread[0xb92a7700],消费者消费任务: 334 / 1437 = 0
thread[0xbb2ab700],生产者派发任务: 881 / 481 = ?
thread[0xbaaaa700],生产者派发任务: 990 * 373 = ?
thread[0xb72a3700],消费者消费任务: 990 * 373 = 369270
thread[0xba2a9700],生产者派发任务: 590 + 693 = ?
thread[0xb9aa8700],生产者派发任务: 985 - 912 = ?
thread[0xb72a3700],消费者消费任务: 590 + 693 = 1283
^C
2.4 申请信号量和加锁的顺序问题
现在来谈一下,是先加锁好,还是先申请信号量好?
答案是先申请信号量更好。
因为首先申请信号量的过程本来就是原子的,不需要将其放在申请锁之后。
其次,如果先申请锁,那么没有申请到锁的线程什么也干不了,整个过程只有申请到锁的那一个线程在“忙前忙后”。而如果先申请信号量,则申请到信号量的线程可以去申请锁,而其他线程也可以同时在申请信号量,明显提高了效率。
当然,两种方式运行时间的长短,感兴趣的读者可以将上面的代码复制粘贴,然后修改信号量和锁的先后位置,运行观察一下,这里就不演示了。
2.5 多生产多消费的意义
这个话题跟上一篇文章中----阻塞队列中多线程的意义是一样的。
即一个线程在访问队列的时候,其他的线程也可以获取和执行任务,提升了效率。
本篇完,青山不改,绿水长流!