1.线程互斥
进程线程间的互斥相关背景概念
- 临界资源:多线程执行流共享的资源就叫做临界资源。
- 临界区:每个线程内部,访问临界资源的代码,就叫做临界区。
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用。
- 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成。
互斥量mutex
大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
多个线程并发的操作共享变量,会带来一些问题。
下面的代码是模拟多个线程抢票的过程。
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
#include <memory>
#include "thread.hpp"
using namespace std;
//共享资源 ->火车票
int tickets = 100;
void* Get_tickets(void* args)
{
string username = static_cast<const char*>(args);
while(true)
{
if(tickets > 0)
{ //微秒时间,1秒 = 1000毫秒 1毫秒 = 1000微秒...
usleep(1245); //模拟抢票时间
cout << username << "正在抢票中... 票号:"<< tickets-- << endl;
}
else
break;
}
return nullptr;
}
int main()
{
//这里用的是封装好的线程
unique_ptr<Thread> thread1(new Thread(Get_tickets,(void*)"user1 ",1));
unique_ptr<Thread> thread2(new Thread(Get_tickets,(void*)"user2 ",2));
unique_ptr<Thread> thread3(new Thread(Get_tickets,(void*)"user3 ",3));
unique_ptr<Thread> thread4(new Thread(Get_tickets,(void*)"user4 ",4));
thread1->join();
thread2->join();
thread3->join();
thread4->join();
return 0;
}
把Linux线程库的接口进行封装,封装线程的代码如下:lesson11/test4/thread.hpp · 晚风不及你的笑/MyCodeStorehouse - 码云 - 开源中国 (gitee.com)
我们发现多个线程进行抢票居然出现了负数,这是违反常理的,不论是火车票还是电影票,100个位置卖了102张票,多出了两个人就会有问题。
为什么会出现上述的结果呢?
1. if 语句判断条件为真以后,代码并发的切换到其他线程。注:线程什么时候切换呢?比如时间片到了,来了更高优先级的线程,线程在等待的时候。
2. usleep 这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段。
3. --ticket 操作本身就不是一个原子操作。
-- 操作并不是原子操作,而是对应三条汇编指令:
- load :将共享变量ticket从内存加载到寄存器中
- update : 更新寄存器里面的值,执行-1操作
- store :将新值,从寄存器写回共享变量ticket的内存地址
要解决上面的问题,需要做到三点:
- 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
- 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
- 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
要做到上面这三点,其实就是需要一把锁,Linux上提供的这把锁叫互斥量。
互斥量的接口
初始化互斥量
初始化互斥量有两种方法:
1.静态分配 :pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
上了锁后,抢票就正常了,但是我们发现只有一个线程一直在抢,这个问题后面说。
2.动态分配
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);
参数:
mutex:要初始化的互斥量
attr:NULL
销毁互斥量
int pthread_mutex_destroy(pthread_mutex_t *mutex);
销毁互斥量需要注意:
- 使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
- 不要销毁一个已经加锁的互斥量
- 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
互斥量加锁和解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号int pthread_mutex_trylock(pthread_mutex_t *mutex);
尝试申请锁,申请成功直接持有锁并返回0,申请失败出错返回错误码
调用 pthread_mutex_lock 时,可能会遇到以下情况:
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功。
- 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁
#include <iostream>
#include <string>
#include <vector>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
#include <memory>
#include "thread.hpp"
using namespace std;
//共享资源 ->火车票
int tickets = 100;
class ThreadData
{
public:
ThreadData(const string& name,pthread_mutex_t* mutex_p)
:_name(name),_mutex_p(mutex_p)
{}
~ThreadData()
{}
public:
string _name;
pthread_mutex_t* _mutex_p;
};
void* Get_tickets(void* args)
{
ThreadData* td = static_cast<ThreadData*>(args);
while(true)
{
pthread_mutex_lock(td->_mutex_p); //加锁
if(tickets > 0)
{ //微秒时间,1秒 = 1000毫秒 1毫秒 = 1000微秒...
usleep(1245); //模拟抢票时间
cout << td->_name << "正在抢票中... 票号:"<< tickets-- << endl;
pthread_mutex_unlock(td->_mutex_p); //解锁
}
else
{ //不能直接在外面解锁,因为有break,会直接跳过,导致锁没有解开的场景
pthread_mutex_unlock(td->_mutex_p);
break;
}
}
return nullptr;
}
int main()
{
#define NUM 4
pthread_mutex_t lock;
pthread_mutex_init(&lock,nullptr); //初始化锁
vector<pthread_t> tids(NUM);
for(int i = 0;i<NUM;++i)
{
char buffer[64];
snprintf(buffer,sizeof(buffer),"thread %d",i+1);
ThreadData* td = new ThreadData(buffer,&lock);
pthread_create(&tids[i],nullptr,Get_tickets,td);
}
for(const auto& Tids:tids)
{
pthread_join(Tids,nullptr);
}
pthread_mutex_destroy(&lock);//销毁锁
return 0;
}
这里也对互斥锁的接口进行了封装,感兴趣的可以看一下:lesson11/test4/Mutex.hpp · 晚风不及你的笑/MyCodeStorehouse - 码云 - 开源中国 (gitee.com)
上了锁后,票数正常了,没有出现负数,但是出现了某一个线程一直抢票的现象。这是为什么?
我们要知道,锁只规定了互斥访问,没有规定谁优先,谁先执行,也没有说必须轮着一个一个执行。所以我们让线程执行完抢票后,在等一会,模拟一下形成订单,有了这个等待的过程,其他的线程就可以抢到锁了。
如何看待锁(互斥量)?
我们要对临界区的资源上锁,首先要让所有线程看到这把锁,那么这个锁的本身就属于共享资源。全局变量(比如票)是要被保护的,锁是用来保护全局资源的,那么锁本身也是全局资源,锁的完全谁来保护呢?
这就需要保证加锁的过程必须是安全的,这个不需要我们操心,设计者在设计的时候就已经考虑到了,加锁的过程是原子的。也就是说,要么申请成功,要么申请不成功,不存在其他情况。
如果锁申请成功,那么就继续执行临界区的代码,如果暂时申请不成功,那么执行流会阻塞,直到其他有执行流释放锁,那么操作系统会唤醒该执行流,再次申请锁。
如果一个线程申请锁成功,正在访问临界资源期间,该线程可不可以被切换呢(从CPU中切走)? 答案是可以!
而该线程被切走了以后,其他线程可以访问临界区吗? 答案是不可以!
因为当持有锁的线程被切走时,是抱着锁被切走的,即便该线程被切走了,其他的线程依旧无法申请到锁,也无法访问临界区执行代码。直至该线程释放锁,其他线程才可以申请锁,访问临界资源。
对于其他线程而言,有意义的锁的状态只有两种:1.申请锁前,2.释放锁后。站在其他线程的角度,看待当前线程持有锁的过程,就是原子的!
互斥量实现原理(加锁解锁的原理)
经过上面的例子,大家已经意识到单纯的 i++ 或者 ++i 都不是原子的,有可能会有数据一致性问题
为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。
下图为加锁解锁的汇编指令:
1.CPU内的寄存器只有一套,被所有执行流共享,2.CPU内寄存器上的内容是每个执行流私有的,该数据属于运行时的上下文,执行流被切换时要带走自己的上下文。
加锁:
- 第一步线程把0写入CPU的寄存器中(寄存器中的数据属于线程上下文,线程被切换时会带走上下文);
- 第二步将寄存器和互斥量的值进行交换,交换的本质就是将共享的数据交换到我的上下文当中;
- 第三步进行判断,当前占有CPU的线程,寄存器中的值是否大于0,大于则返回0,表示申请锁成功,否则申请不成功,线程挂起等待。
解锁:
把1拷贝到mutex里,唤醒等待互斥量的线程,然后返回。
因为线程在占用CPU执行时随时可能被切换,所以下图是线程A刚把第二步做完就被切换了。即使线程A还没有申请成功,但是已经拿到了锁的内容,我们就认为线程A已经申请到锁了,因为其他线程来申请也是申请不到的,只有等线程A释放了以后才可能申请到。
可重入和不可重入
重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
常见可重入的情况
- 不使用全局变量或静态变量
- 不使用用malloc或者new开辟出的空间
- 不调用不可重入函数
- 不返回静态或全局数据,所有数据都有函数的调用者提供
- 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据
常见不可重入的情况
- 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
- 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
- 可重入函数体内使用了静态的数据结构
线程安全和线程不安全
线程安全:多个线程并发执行同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
常见的线程不安全的情况
- 不保护共享变量的函数
- 函数状态随着被调用,状态发生变化的函数
- 返回指向静态变量指针的函数
- 调用线程不安全函数的函数
常见的线程安全的情况
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的类或者接口对于线程来说都是原子操作
- 多个线程之间的切换不会导致该接口的执行结果存在二义性
可重入与线程安全的联系
- 函数是可重入的,那就是线程安全的
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
可重入与线程安全区别
- 可重入函数是线程安全函数的一种
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。
死锁
死锁是指在一组进程中的各个线程均占有不会释放的资源,但因互相申请被其他线程所占用不会释放的资源而处于的一种永久等待状态。
一个执行流不释放锁,而且还重复申请锁也会导致死锁。
死锁四个必要条件
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放(一个线程去申请另一把锁,但是不释放自己的锁)
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系(线程A拿着自己的锁去申请线程B的锁,线程B也拿着自己的锁去申请线程A的锁,他们两造成一个环路条件)
避免死锁
- 破坏死锁的四个必要条件
- 加锁顺序一致
- 避免锁未释放的场景
- 资源一次性分配
避免死锁算法
死锁检测算法(了解)
比如多线程中有一个线程不做其他事情,专门检查有没有死锁的情况,如果有就去把那个锁释放掉。
银行家算法(了解
2.线程同步
同步概念与竞态条件
同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。
竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。
生产者消费者模型
为何要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
我们常见的生产者消费者模型比如 超市,供货商给超市供应商品,而学生或消费者到超市买商品或者说消费商品。那么供货商供应商品时,消费人群可以干任何事,可以在工作,在玩,在购买商品,当然消费者在购买商品时,供货商也可以干任何事,这个生产的过程和消费的过程是解耦的。我们把超市这个临时保存产品的场所叫做缓冲区。
如果没有超市这个容器,那么你购买商品需要到供货商的厂区去购买,供货商不是说你到了他们厂区他就直接把商品给你了(他根本不知道你要什么),而是你到了之后提出需求后才开始生产。那么这个生产的过程需要你去等,等到生产完成,你拿回去自己使用这个过程,这些都需要时间支撑。此时生产者和消费者之间是强耦合的,也就是说消费者需要,生产者才去生产,消费者等待生产过程,消费者再去消费商品,这个时间消耗太大,不论是生产者还是消费者成本都太高了。
- 生产者和生产者的关系:互斥 -> 容器的容量就那么大,同一块区域你放了商品别人就放不了商品了,当然也不能存在两个人同时往一块区域放商品的情况,这会出现数据不一致问题。
- 消费者和消费者的关系:互斥 -> 商品是有限的,同一个商品你拿走了别人就不能拿了,当然也不能存在两个线程拿同一份资源的情况,也会造成数据不一致问题。
- 生产者和消费者的关系:互斥与同步 ->我们知道计算机世界里数据是可以被覆盖的,互斥是因为消费者在这个区域拿商品,生产者正好也过来放商品,可能会出现消费者还没有把数据完全拿走,而生产者就把这个数据覆盖成其他的数据了。同步是超市里的商品空了,生产者放商品,消费者才能拿商品,或者超市的商品满了,消费者拿走商品,生产者才能放商品。
总结:“321原则”
- 三种关系:生产者和生产者的关系(互斥),消费者和消费者的关系(互斥),生产者和消费者的关系(互斥(保证共享资源的安全性),同步) 他们之间的关系是对于商品(数据)而言的。
- 两种角色:生产者线程,消费者线程
- 一个交易场所:一段特定结构的缓冲区。
只要我们想写生产消费模型,其实本质工作就是维护“321原则”。
生产者消费者模型优点
1.生产者线程和消费者线程进行解耦。
2.支持生产和消费的一段时间的忙闲不均的问题
3.生产者和消费者并发的执行,能提高效率。
条件变量
当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
当我们把一份临界资源用锁保护起来时,第一次到来的线程优先获得访问权,因为在锁之后一般有一个条件判断,该线程通过条件判断访问临界资源完毕后解锁,正常情况就应该换先一个线程访问临界资源了。但是很有可能该线程是一直循环访问,第一个循环结束后,他再次上来申请锁,判断条件是否满足,不满足条件后解锁...由于他得天独厚的条件,导致它申请锁会很快,那么该线程会一直重复申请锁,判断条件,不满足条件,解锁这个过程。导致其他线程申请不到锁,造成线程饥饿问题。此时就需要条件变量来解决问题。
条件变量是pthread库提供的一个数据类型,每当有线程访问临界资源时,申请互斥锁不成功,如果没有条件变量就会被阻塞挂起,有条件变量的话,就会通过一个接口将线程的PCB链接到条件变量里的队列中进行等待,上一个线程访问完临界资源后,它再想继续访问临界资源,对不起,要去条件队列里等待,此时会调用接口唤醒条件队列中的第一个线程,让他进行访问。这样就不会出现线程饥饿问题了。
举个列子更好的理解条件变量:
我们都知道在面试的时候,会一个一个的叫人去办公室面试,这个面试官就相当于共享资源,那么我们肯定不能一窝蜂的都挤到面试官面前等待提问,所以办公室就相当于互斥锁。如果仅仅只有互斥锁,那么一群人都在办公室门口等待被叫去面试,我们假设面试官只知道面试,不管面试的人是谁。那么第一个人面试完毕,刚走出办公室,门一关,那个人手都还在门把手上,他又直接把门打开进去面试了,就这样他一直重复的去做这样的事情,导致大家都面试不了,造成饥饿问题。
这时就需要一个管理者,他对所有面试的人说,现在这有一个等待区,大家赶紧去排队领号,我一会叫号,叫到号的人才能去面试,没有号的人就别想面试了。此时大家就赶紧去排队,等待叫号,然后面试,不一会就井井有序了。我们把这个等待区和管理者叫做条件变量。
条件变量函数
初始化
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrictattr);
参数:
cond:要初始化的条件变量
attr:NULL
销毁
int pthread_cond_destroy(pthread_cond_t *cond);
cond:要销毁的条件变量
等待条件
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
作用:调用该函数的线程进行等待
参数:
cond:要在这个条件变量上等待
mutex:互斥量,后面详细解释int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex,const struct timespec *restrict abstime);
作用:设置一个等待时间,时间到了自动返回
唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond);
作用:唤醒一批线程,唤醒在该条件队列下的所有线程
int pthread_cond_signal(pthread_cond_t *cond);
作用:唤醒一个线程
简单的使用:唤醒条件队列中的一个线程
唤醒条件队列中的所有线程
完整代码:lesson11/test5/1_test_cond/testCond.cc · 晚风不及你的笑/MyCodeStorehouse - 码云 - 开源中国 (gitee.com)
基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
C++ queue模拟阻塞队列的生产消费模型
这里的代码先以单生产,单消费来演示。
#include <sys/types.h>
#include <unistd.h>
#include <ctime>
#include "BlockQueue.hpp"
#include "Task.hpp"
const string oper = "+-*/%"; //运算符号
int mymath(int x,int y,char op)
{
int result = 0;
switch(op)
{
case '+':
result = x + y;
break;
case '-':
result = x - y;
break;
case '*':
result = x * y;
break;
case '/':
{
if(y == 0)
{
cerr<<"div zero err" << endl;
result = -1;
}
else
result = x / y;
break;
}
case '%':
{
if(y == 0)
{
cerr<<"mod zero err" << endl;
result = -1;
}
else
result = x % y;
break;
}
}
return result;
}
void* consumer(void* args) //消费者线程
{
BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
while(true)
{
//消费活动
Task t;
bq->pop(&t);
cout<<"消费任务:" << t() << endl;
// sleep(1);
}
return nullptr;
}
void* productor(void* args) //生产者线程
{
BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
while(true)
{
//生产活动
int x = rand()%10+1;
int y = rand()%5;
int operidx = rand()% oper.size(); //运算符下标
Task t(x,y,oper[operidx],mymath);
bq->push(t);
cout<< "生产任务:" << t.to_Task_string() << endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand((unsigned long)time(0) ^ getpid());
BlockQueue<Task>* bq = new BlockQueue<Task>();
pthread_t c,p;
pthread_create(&c,nullptr,consumer,bq);
pthread_create(&p,nullptr,productor,bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
delete bq;
return 0;
}
上面的代码只是主函数的接口,具体的实现有兴趣的自己看看把:lesson11/test5/BlockQueue · 晚风不及你的笑/MyCodeStorehouse - 码云 - 开源中国 (gitee.com)
在此基础上多添加了一个存储文件的线程,这么就不过多赘述了,想了解的自己看:
lesson11/test5/BlockQueue2 · 晚风不及你的笑/MyCodeStorehouse - 码云 - 开源中国 (gitee.com)
而多生产和多消费可以基于单生产单消费的代码直接进行添加线程,由于我们只有一个队列,一把锁,所以不管多个线程生产还是多个线程消费,都只有一个线程能申请到锁,
在前面说过生产者消费者模型很高效,那么它高效在哪里呢?
它并不是高效在阻塞队列上,而是可以在生产之前和消费之后,让线程并发的执行。因为我们今天只是模拟任务,但如果是真的任务,那么生产任务和消费任务一定特别的耗时间,所以让他们并发的执行,就会大大提高效率。你生产你的,我消费我的,互不干涉,只有在队列为空或为满时才会有一方的线程暂时等待。
说一说上面的代码一些“不足”的地方:
我们知道一个线程操作临界资源时是先加锁,在检测是否满足条件,根据检测结果进行下一步动作。可是呢,我们无法在加锁之前就得知此次访问是否满足条件,这样的话就会造成不管满不满足条件,都要先加锁,然后检测,满足就挂起,这样线程啥也没干,转了一圈,还浪费时间。如果我们事先就知道判断条件满不满足,那线程就该执行执行,该挂起挂起,就不用每次都加锁了。此时就要引入信号量的概念了。
POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
回顾一下信号量的概念:信号量是什么?
信号量本质是一把计数器,这个计数器是用来衡量临界资源中资源数量多少的。只要拥有了信号量那么在未来就一定能拥有临界资源的一部分。申请信号量的本质就是对临界资源中特定小块的预定机制。为什么要有信号量呢?
信号量可以让线程在访问临界资源前就知道临界资源的使用情况,可以通过信号量评估资源的使用率。
信号量一般都需要什么操作?
我们知道信号量是计数器,线程要申请信号量,信号量就一定要被所有线程都能看到,那么信号量本身就是公共资源。由于信号量是公共资源,那么申请(p)信号量资源(sem--)和归还(v)信号量资源(sem++)的操作就必须是原子的。信号量的核心操作:PV原语。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
基于环形队列的生产消费模型
环形队列采用数组模拟,用模运算来模拟环状特性。环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态,但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程。
生产者和消费者在什么情况下会访问同一个位置呢?1.队列为空的时候,2.队列为满的时候,其他情况下,生产者和消费者访问的是不同的位置。也就是说在环形队列中,大部分情况下,单生产和单消费是可以并发的执行的,只有在满或者空时,才会有互斥与同步的问题!
//RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <cassert>
#include <semaphore.h>
#include <pthread.h>
using namespace std;
static const int gcap = 5;
template<class T>
class RingQueue
{
public:
void P(sem_t& sem) //获取信号量资源
{
int n = sem_wait(&sem);
assert(n == 0);
(void)n;
}
void V(sem_t& sem) //归还信号量资源
{
int n = sem_post(&sem);
assert(n == 0);
(void)n;
}
public:
RingQueue(const int& cap = gcap)
:_queue(cap),_cap(cap)
{
int n = sem_init(&_spaceSem,0,_cap);
assert(n == 0);
n = sem_init(&_dataSem,0,0);
assert(n == 0);
_ProductorStep = _ConsumerStep = 0;
pthread_mutex_init(&_pmutex,nullptr);
pthread_mutex_init(&_cmutex,nullptr);
}
void push(const T& in)
{
P(_spaceSem);
pthread_mutex_lock(&_pmutex);
_queue[_ProductorStep++] = in;
_ProductorStep %= _cap;
pthread_mutex_unlock(&_pmutex);
V(_dataSem);
}
void pop(T* out)
{
P(_dataSem);
pthread_mutex_lock(&_cmutex);
*out = _queue[_ConsumerStep++];
_ConsumerStep %= _cap;
pthread_mutex_unlock(&_cmutex);
V(_spaceSem);
}
~RingQueue()
{
sem_destroy(&_spaceSem);
sem_destroy(&_dataSem);
pthread_mutex_destroy(&_pmutex);
pthread_mutex_destroy(&_cmutex);
}
private:
vector<T> _queue;
int _cap; //队列容量
sem_t _spaceSem; //生产者信号量,->空间资源
sem_t _dataSem; //消费者信号量,->数据资源
int _ProductorStep; //生产者在环形队列的下标
int _ConsumerStep; //消费者在环形队列的下标
pthread_mutex_t _pmutex;
pthread_mutex_t _cmutex;
};
//mian.cc
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include "RingQueue.hpp"
#include "Task.hpp"
string Selfname()
{
char name[128];
snprintf(name,sizeof(name),"thread[0x%x]",pthread_self());
return name;
}
void* ProductorRoutine(void* args)
{
// RingQueue<int>* rq = static_cast<RingQueue<int>* >(args);
RingQueue<Task>* rq = static_cast<RingQueue<Task>* >(args);
while(true)
{
//生产数据
// int data = rand()%10+1;
// rq->push(data);
// cout<< "生产数据: "<< data <<endl;
//获取或构建任务,是需要花时间的
int x = rand()%10+1;
int y = rand()%5;
char op = oper[rand()%oper.size()]; // int operIdx = rand()%oper.size();
Task t(x,y,op,mymath); // Task t(x,y,oper[operIdx],mymath);
rq->push(t);
cout<< Selfname()<<",生产任务:"<< t.to_Task_string() <<endl;
sleep(1);
}
}
void* consumerRoutine(void* args)
{
// RingQueue<int>* rq = static_cast<RingQueue<int>* >(args);
RingQueue<Task>* rq = static_cast<RingQueue<Task>* >(args);
while(true)
{
// int data;
// rq->pop(&data);
// cout<< "获取数据: "<< data <<endl;
//消费或执行任务,是需要花时间的
Task t;
rq->pop(&t);
cout<<Selfname() <<",计算任务:"<< t() << endl;
// sleep(1);
}
}
//多生产,多消费
int main()
{
srand((unsigned int)time(nullptr) ^ getpid() ^ 0x1241235);
RingQueue<Task>* rq = new RingQueue<Task>();
pthread_t p[5],c[5];
for(int i = 0;i<5;++i) pthread_create(p+i,nullptr,ProductorRoutine,rq);
for(int i = 0;i<5;++i) pthread_create(c+i,nullptr,consumerRoutine,rq);
for(int i = 0;i<5;++i) pthread_join(p[i],nullptr);
for(int i = 0;i<5;++i) pthread_join(c[i],nullptr);
delete rq;
return 0;
}
// //单生产,单消费
// int main()
// {
// srand((unsigned int)time(nullptr) ^ getpid() ^ 0x1241235);
// RingQueue<Task>* rq = new RingQueue<Task>();
// pthread_t p[5],c[5];
// pthread_create(&p,nullptr,ProductorRoutine,rq);
// pthread_create(&c,nullptr,consumerRoutine,rq);
// pthread_join(p,nullptr);
// pthread_join(c,nullptr);
// delete rq;
// return 0;
// }
完整代码:
lesson11/6环形队列的生产消费模型 · 晚风不及你的笑/MyCodeStorehouse - 码云 - 开源中国 (gitee.com)