💖作者:小树苗渴望变成参天大树🎈
🎉作者宣言:认真写好每一篇博客💤
🎊作者gitee:gitee✨
💞作者专栏:C语言,数据结构初阶,Linux,C++ 动态规划算法🎄
如 果 你 喜 欢 作 者 的 文 章 ,就 给 作 者 点 点 关 注 吧!
文章目录
- 前言
- 一、线程同步(条件变量)
- 二、cp模型
- 1.1模型的补充
- 2.2 案例演示
- 2.3 CP模型记忆
- 总结
前言
上一篇博主花了很长时间带大家理解什么是线程,线程的作用,缺点,以及怎么去使用,相信大家已经自己去实践了一下,今天我们就来讲讲线程的一个很常见但也很重要的模型–cp模型,在讲解这个模型之前,博主要先讲解一下条件变量,因为他涉及到同步,一会博主都会详细介绍的,所以大家不用担心,话不多说,我们开始进入正文讲解。
一、线程同步(条件变量)
之前讲解了线程的互斥,简单的理解为对于一份临界资源只允许一个线程可以去访问他,而同步看上去和互斥是相反的词,实际不是的,在上一篇关于线程的讲解第六章节的时候提到多的抢票程序,说到第四点的时候就发现票被同一个线程抢走了,原因是在从线程的时间片内,刚释放锁的线程离该锁最近,别的线程还要唤醒,所以不做任何措施的线程刚释放锁的,就会立马去申请锁,所以我们的操作系统认为这样不好,一个共享资源让一个线程都去占用了,其他线程怎么办,所以就要想办法,你线程如果刚释放锁,就必须去后面排队,不能在去申请锁了。再去申请就会失败。
有了上面的知识铺垫,我们才有了线程同步的概念:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步,那线程同步怎么做到呢?? ----条件变量。
当一个程序是多线程的,他每次竞争完锁之后都要去后面排队,是哪个后面呢??我们其中一个线程拥有锁之后,其他线程去申请锁就会失败,从而形成一个等待队列,而刚释放锁的线程,他想申请锁也会失败,所以去等待队列后面去排队,前提是申请锁失败(临界资源不就绪),才会去等待。 当锁被释放后,就要唤醒等待队列中的线程去申请锁,去访问临界资源,让程序继续去执行。
有来上面的讲解,我们知道条件变量必须有两个属性,一个是等待队列,一个是唤醒线程的标志位,我们的条件变量是锁的使用差不多,需要初始化。来看讲解:
我们创建多线程程序,每个线程对全局变量进行有顺序的加加:
#include<iostream>
#include<pthread.h>
#include<vector>
#include <unistd.h>
using namespace std;
int cnt=0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void* func(void* arg)
{
uint64_t number = (uint64_t)arg;
std::cout << "pthread: " << number << " create success" << std::endl;
while(true)
{
pthread_mutex_lock(&mutex);
std::cout << "pthread: " << number<< " ,cnt:" <<cnt++<< std::endl;
pthread_cond_wait(&cond,&mutex);
pthread_mutex_unlock(&mutex);
}
}
int main()
{
vector<pthread_t> tids;
pthread_t tid;
for(uint64_t i=1;i<=4;i++)
{
//注意:最后一个参数不要传地址进去,因为线程的栈区不是共享的,这会导致后面的线程名都是i=4的。直接传拷贝就可以了。
pthread_create(&tid,NULL,func,(void*)i);//创建4个线程
usleep(1000);
}
while(true)//让主线程来实现唤醒操作。
{
sleep(1);
pthread_cond_signal(&cond);//唤醒一个线程
//pthread_cond_broadcast(&cond);//唤醒所有线程
cout<<"主线程唤醒一个线程"<<endl;
// cout<<"主线程唤醒所有线程"<<endl;
}
for(auto tid:tids)//主线程进行等待。
{
pthread_join(tid,NULL);
}
return 0;
}
通过结果来看我们达到了我们想要的效果,来解释程序的代码:
- 我们的条件变量也像锁一样需要进行初始化,可以使用函数,也可以使用全局的初始化
- 使用函数进行初始化就需要使用pthread_cond_destroy()这个函数进行销毁,全局初始化的,则不用,这个和锁的使用是一样的。
- 我们的条件变量可以一次唤醒等待队列的一个线程,通常都是队头的,也可以一次唤醒队列中所有的线程。
为什么我们的等待要放在加锁解锁之间??
先改造我们之前RAII风格的抢票程序,让他变得也有顺序。
我们要加一个条件变量进去:
myticket.hpp:
#pragma once
#include <iostream>
#include <cstdio>
#include <cstring>
#include <vector>
#include <unistd.h>
#include <pthread.h>
using namespace std;
class mylock
{
public:
mylock(pthread_mutex_t*lock,pthread_cond_t* cond)
:lock_(lock),cond_(cond)
{}
void lock()
{
pthread_mutex_lock(lock_);
}
void unlock()
{
pthread_mutex_unlock(lock_);
}
void wait()
{
pthread_cond_wait(cond_,lock_);
}
~mylock()
{
}
private:
pthread_mutex_t* lock_;
pthread_cond_t* cond_;
};
class lockguard
{
public:
lockguard(pthread_mutex_t*lock,pthread_cond_t* cond)
:mutex_(lock,cond)
{
mutex_.lock();
}
void wait()
{
mutex_.wait();
}
~lockguard()
{
mutex_.unlock();
}
private:
mylock mutex_;
};
mythread.cc:
z#include"myticket.hpp"
#define NUM 4
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
class threadData
{
public:
threadData(int number)
{
threadname = "thread-" + to_string(number);
}
public:
string threadname;
};
int tickets = 100; // 用多线程,模拟一轮抢票
void *getTicket(void *args)
{
threadData *td = static_cast<threadData *>(args);
const char *name = td->threadname.c_str();
while (true)
{
lockguard lockg(&lock,&cond);//只在这歌循环里面有效,出作用域就销毁
if(tickets > 0)
{
//usleep(100);
printf("who=%s, get a ticket: %d\n", name, tickets);
tickets--;
lockg.wait();
}
else
break;
usleep(13);
}
printf("%s ... quit\n", name);
return nullptr;
}
int main()
{
vector<pthread_t> tids;
vector<threadData *> thread_datas;
for (int i = 1; i <= NUM; i++)
{
pthread_t tid;
threadData *td = new threadData(i);
thread_datas.push_back(td);
pthread_create(&tid, nullptr, getTicket, thread_datas[i - 1]);
tids.push_back(tid);
sleep(1);
}
while(true)
{
sleep(1);
pthread_cond_signal(&cond); //唤醒在cond的等待队列中等待的一个线程,默认都是第一个
//pthread_cond_broadcast(&cond);
std::cout << "signal one thread..." << std::endl;
}
for (auto thread : tids)
{
pthread_join(thread, nullptr);
}
for (auto td : thread_datas)
{
delete td;
}
return 0;
}
通过一开始上面两个程序,我们发现条件变量是可以实现线程同步的,我们的条件变量的操作一种就四个函数,初始化和销毁没啥可讲的,唤醒肯定是由别的线程唤醒的,自己都在等待的不可能自己把自己唤醒的,一会讲解cp问题的时候会更加好理解,我们的最后一个函数就是等待,**我们把线程等待放到了加锁和解锁之间,我们上面说过,当我们申请资源不就绪的时候就会等待成功, 当我是持有锁的时候你让我去等待,那别的线程不就拿不到锁了,不用担心,我们等待函数让持有锁的线程去等待,会自动释放锁的。这个问题解决了,我们为什么去等待,一定是临界资源不就绪了 ,你怎么知道临界资源就绪还是不就绪呢??是你判断出来的,判断是访问临界资源吗??答案是的,所以判断必须在加锁之后,这也就导致了等待在加锁和解锁之间。对于第一个和第二个程序,他们的线程申请锁成功,获取到临界资源,他们不去等待而是在他们访问临界资源的时候,然后自己直接去等待,这样别人就可以申请到锁去访问了。 **
二、cp模型
上面说了那么多,我们终于将条件变量讲解完毕了,可以来讲解cp模型,他实际是叫生产者消费者模型,这个模型和我们生活中的案例非常符合,接下来讲解一个小故事带大家理解这个模型。
在我们日常生活中去的比较多的就是超市了,我们去超市直接去买东西,不需要等产品生产好了在去拿,而超市等商品没有了,直接去生产商去进货,有了超市的存在消费者和生产者之间存在的差异就抵消了,如果我们去生产商进行消费,那我们还要等生产出来才可以拿到,而且一次生产的特别少,这样是不行的。所以这个超市就是生产者和消费者共享的一个地方,让我们消费者和生产可以共同实现同步互斥。
1.模型的优点:
我们通过超市实现了消费者和生产者的忙闲不均。
将生产者和消费者实现了解藕。
支持并发(一会细说)
2.模型内部的关系:
(1)生产者与生产者
他们是互斥关系,多个生产者之间要分别给超市供货,好比同一个货架上已经放了一个生产商的货物,另一个就不能放了,货架多,让我们觉得生产者不是互斥的。所以他们之间要 互斥
(2)消费者与消费者
他们是互斥关系,虽然超市里面好多消费者一起去购物,但是同一个商品只能有一个消费者获得,当商品不足的时候,可能就会有多个消费者去抢同一个商品,所以他们之间要 互斥
(3)生产者与消费者
当我们消费者在进行消费的时候,你生产者过来把自己的商品放上去,那不就把之前的商品给覆盖了吗,万一消费者想要之前的商品不就获取不到了,所以两者要互斥,有一天,我们顾客想要打电话给超市问他方便面有没有,此时一直打不通,原因是我们生产方便面的产家一直给超市打电话,你要不要方便面,导致消费者一直打不进去电话,此时生产者就一直占有超市这个共享资源,所以刚打完电话就不要打了,排队去,五天后在,这样消费者才可以进行消费,所以生产者和消费者也要保持 同步 关系
我们先来实现单生产单消费的模型,然后在改。
对于这个超市,他的作用就是效率高,他的本质大号的缓存空间,今天我们实现的是基于BlockingQueue的生产者消费者模型
所以我们要有一个阻塞队列:
main.cc:
#include"BlockQueue.hpp"
void* Productor(void* arg)
{
BlockQueue<int>*bq=static_cast<BlockQueue<int>*>(arg);
int data=0;
while(true)
{
data++;
bq->push(data);
cout<<"生产者生产了数据:"<<data<<endl;
sleep(1);
}
}
void* Consumer(void* arg)
{
BlockQueue<int>*bq=static_cast<BlockQueue<int>*>(arg);
while(true)
{
int data=bq->pop();
cout<<"消费者消费了数据:"<<data<<endl;
}
}
int main()
{
pthread_t productor;//定义一个生产者线程
pthread_t consumer;//定义一个消费者线程
BlockQueue<int>* bq=new BlockQueue<int>();//这是堆区,可以之间传地址的,堆区线程共享,之前是栈区的i
pthread_create(&productor,nullptr,Consumer,bq);//创建一个消费者线程
pthread_create(&consumer,nullptr,Productor,bq);//创建一个生产者线程
//主线程什么事情都不干,监视两个线程就可以,唤醒是两个线程互相唤醒,不像之前讲解的需要主线程来进行唤醒。
pthread_join(consumer,nullptr);
pthread_join(productor,nullptr);
delete bq;
return 0;
}
BlockQueue.hpp:
#pragma once
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<queue>
using namespace std;
template<class T>//模板类
class BlockQueue
{
static const int defalutnum = 20;//阻塞队列的大小
public:
BlockQueue(int maxSize=defalutnum):maxSize(maxSize)
{
pthread_mutex_init(&lock,nullptr);//给锁进行初始化
pthread_cond_init(&full,nullptr);//给两个条件变量进行初始化
pthread_cond_init(&empty,nullptr);
//控制一个高低,不让一生产就消费,也不让一消费就生产
lower_water=maxSize/3;
high_water=maxSize*2/3;
}
// 为什么wait是放在加锁喝解锁之间的,如果没有判断大家应该怎么理解,线程刚加锁就被放到条件变量下main去等待,那有什么意义,并且是持有锁的情况去放到条件变量等待的,那别人怎么拿到锁呢,原因是wait这个函数将持有锁的线程放到条件变量去等待回自动释放锁
//话说回来,我们刚才不加判断,直接去等待,那么这个加锁就没有意义,那什么时候去等待合适呢??答案是不符合条件的时候,临界资源不就绪的时候,别人在用,是我们通过if判断出来的,是我们程序员自己知道的,那么我们进行判断的时候是不是也在访问临界资源,就
//注定我们的等待是在加锁和解锁之间的,这个时候大家应该理解等待为什么放在加锁解锁之间了吧。
T pop()
{
pthread_mutex_lock(&lock);//接下来要进行访问阻塞队列是临界资源需要加锁
if(q.empty())//如果队列为空,消费者就不能进行消费,就要进入空的条件变量进行等待
{
pthread_cond_wait(&empty,&lock);//临界资源不就绪去排队
}
T data =q.front();//消费数据
q.pop();
if(q.size()<=lower_water) pthread_cond_signal(&full);///消费者消费一个,说明队列肯定不为满,所以唤醒一个生产者
pthread_mutex_unlock(&lock);//进行解锁,给下一个要访问的线程进行使用
return data;
}
void push(T data)
{
pthread_mutex_lock(&lock);//接下来要进行访问阻塞队列是临界资源需要加锁
if(q.size()==maxSize)//如果队列已经满了,生产者就不能进行生产,就要进入满的条件变量进行等待,临界资源不就绪
{
pthread_cond_wait(&full,&lock);//临界资源不就绪去排队
}
q.push(data);//生产数据
if(q.size()>=high_water) pthread_cond_signal(&empty);//生产者生产一个,说明队列肯定不为空,所以唤醒一个消费者,通过其他线程来唤醒另一个线程。
pthread_mutex_unlock(&lock);//进行解锁,给下一个要访问的线程进行使用
}
~BlockQueue()
{
//因为锁和条件变量都是全局初始化的,所以需要销毁
pthread_mutex_destroy(&lock);//销毁锁
pthread_cond_destroy(&full);//销毁满条件变量
pthread_cond_destroy(&empty);//销毁空条件变量
}
private:
queue<T> q;//阻塞队列,相对于超市
pthread_mutex_t lock;//定义一把锁
pthread_cond_t full;//阻塞队列满的时候生产者进行排队的条件变量
pthread_cond_t empty;//阻塞队列空的时候消费者进行排队的条件变量
int maxSize;//队列最大值
int lower_water;
int high_water;
};
我们看到效果了,那我们多生产多消费怎么去实现呢,因为只有一把锁,所以我们可以一次创建多生产多消费模型,也可以维护上面三种关系,来看改动的代码:
int main()
{
pthread_t productor;//定义一个生产者线程
pthread_t consumer;//定义一个消费者线程
vector<pthread_t> prods;//定义一个生产者线程组
vector<pthread_t> conss;//定义一个消费者线程组
BlockQueue<int>* bq=new BlockQueue<int>();//这是堆区,可以之间传地址的,堆区线程共享,之前是栈区的i
for(uint64_t i=1;i<=4;i++)//创建4个生产者
{
pthread_create(&productor,nullptr,Consumer,bq);//创建一个生产者线程
prods.push_back(productor);//将生产者线程放入生产者线程组
}
for(uint64_t i=1;i<=4;i++)//创建四个消费者
{
pthread_create(&consumer,nullptr,Productor,bq);//创建一个消费者线程
conss.push_back(consumer);//将消费者线程放入消费者线程组
}
//主线程什么事情都不干,监视两个线程就可以,唤醒是两个线程互相唤醒,不像之前讲解的需要主线程来进行唤醒。
for(auto i:prods)
{
pthread_join(i,nullptr);
}
for(auto i:conss)
{
pthread_join(i,nullptr);
}
delete bq;
return 0;
}
#pragma once
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<queue>
using namespace std;
template<class T>//模板类
class BlockQueue
{
static const int defalutnum = 20;//阻塞队列的大小
public:
BlockQueue(int maxSize=defalutnum):maxSize(maxSize)
{
pthread_mutex_init(&lock,nullptr);//给锁进行初始化
pthread_cond_init(&full,nullptr);//给两个条件变量进行初始化
pthread_cond_init(&empty,nullptr);
//控制一个高低,不让一生产就消费,也不让一消费就生产
// lower_water=maxSize/3;
// high_water=maxSize*2/3;
}
// 为什么wait是放在加锁喝解锁之间的,如果没有判断大家应该怎么理解,线程刚加锁就被放到条件变量下main去等待,那有什么意义,并且是持有锁的情况去放到条件变量等待的,那别人怎么拿到锁呢,原因是wait这个函数将持有锁的线程放到条件变量去等待回自动释放锁
//话说回来,我们刚才不加判断,直接去等待,那么这个加锁就没有意义,那什么时候去等待合适呢??答案是不符合条件的时候,临界资源不就绪的时候,别人在用,是我们通过if判断出来的,是我们程序员自己知道的,那么我们进行判断的时候是不是也在访问临界资源,就
//注定我们的等待是在加锁和解锁之间的,这个时候大家应该理解等待为什么放在加锁解锁之间了吧。
T pop()
{
pthread_mutex_lock(&lock);//接下来要进行访问阻塞队列是临界资源需要加锁
if(q.empty())//如果队列为空,消费者就不能进行消费,就要进入空的条件变量进行等待
{
pthread_cond_wait(&empty,&lock);//临界资源不就绪去排队
}
T data =q.front();//消费数据
q.pop();
cout<<"thread_id:"<<pthread_self()<<",消费者消费了数据:"<<data<<endl;
usleep(12);
if(q.size()!=20) pthread_cond_signal(&full);///消费者消费一个,说明队列肯定不为满,所以唤醒一个生产者
pthread_mutex_unlock(&lock);//进行解锁,给下一个要访问的线程进行使用
return data;
}
bool push(T data)
{
pthread_mutex_lock(&lock);//接下来要进行访问阻塞队列是临界资源需要加锁
if(q.size()==maxSize)//如果队列已经满了,生产者就不能进行生产,就要进入满的条件变量进行等待,临界资源不就绪
{
pthread_cond_wait(&full,&lock);//临界资源不就绪去排队
}
q.push(data);//生产数据
cout<<"thread_id:"<<pthread_self()<<",生产者生产了数据:"<<data<<endl;
usleep(13);
if(q.size()!=0) pthread_cond_signal(&empty);//生产者生产一个,说明队列肯定不为空,所以唤醒一个消费者,通过其他线程来唤醒另一个线程。
pthread_mutex_unlock(&lock);//进行解锁,给下一个要访问的线程进行使用
}
~BlockQueue()
{
//因为锁和条件变量都是全局初始化的,所以需要销毁
pthread_mutex_destroy(&lock);//销毁锁
pthread_cond_destroy(&full);//销毁满条件变量
pthread_cond_destroy(&empty);//销毁空条件变量
}
private:
queue<T> q;//阻塞队列,相对于超市
pthread_mutex_t lock;//定义一把锁
pthread_cond_t full;//阻塞队列满的时候生产者进行排队的条件变量
pthread_cond_t empty;//阻塞队列空的时候消费者进行排队的条件变量
int maxSize;//队列最大值
// int lower_water;
// int high_water;
};
博主未来让大家看的更加清楚,将高低水位线去掉了。
1.1模型的补充
对于cp模型还有几点要补充
- 我们的生产商不但要往超市里面放商品,他也要抽时间生产商品,对于这个模型,不止要会放数据到阻塞队列里面,还要会获取数据,一般从网络或者用户去获取,而获取数据也要花时间。
- 对于消费者,我们不可能天天来超市消费,等我们买的商品使用完了才去购买,对于模型也一样,我们取到数据,还要进行处理,处理也要花时间。
我们cp模型前面说过效率较高,并发访问,这是为什么?我们只有一把锁,每次只能有一个线程访问阻塞队列,者不是串行访问吗??确实没错,但是当我们其中一个线程访问时,其他线程在获取数据或者处理数据,这样整体上就实现了并发访问,今天没有合适的场景,但是我么你不嗯呢个忽略cp模型有这个特性
伪唤醒: 重点
我们看到这个代码分别是生产者和消费者的代码,我们圈住的部分,假设我们的生产者生产了一个数据,此时阻塞队列刚好满了,唤醒消费者去访问了,消费者访问了一个,空出来一个,消费者
又去唤醒生产者去生产,此时消费者采取了从全部唤醒策略,将多个生产者线程都唤醒了,假设三个生产者线程必须重新去申请锁,才可以去访问,没有申请到的两个线程被挂起等待,我们申请锁不是执行上面第一行申请锁的函数,而是在等待函数内部去做的,申请成功返回,继续往下面执行,此时申请到锁的生产者线程生产了一个数据,此时队列又满了,然后去唤醒消费者线程,此时不止有消费者线程去申请锁,还有刚才两个被挂起的生产者线程也等着申请锁呢,万一此时其中一个申请到锁,在往里面插数据,就会导致益处,显然这样是不行的,所以我们不能使用if判断,而是要使用while判断。
2.2 案例演示
我们刚才写的是整形,接下来写一个计算器,你发数据,我给你处理数据,就可以完成任务的派发:
Task.hpp:
#pragma once
#include <iostream>
#include <string>
std::string opers="+-*/%";
enum{
DivZero=1,
ModZero,
Unknown
};
class Task
{
public:
Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
{
}
void run()
{
switch (oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if(data2_ == 0) exitcode_ = DivZero;
else result_ = data1_ / data2_;
}
break;
case '%':
{
if(data2_ == 0) exitcode_ = ModZero;
else result_ = data1_ % data2_;
} break;
default:
exitcode_ = Unknown;
break;
}
}
void operator ()()
{
run();
}
std::string GetResult()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=";
r += std::to_string(result_);
r += "[code: ";
r += std::to_string(exitcode_);
r += "]";
return r;
}
std::string GetTask()
{
std::string r = std::to_string(data1_);
r += oper_;
r += std::to_string(data2_);
r += "=?";
return r;
}
~Task()
{
}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};
大家下去自己去看看这个是怎么去运行的,把模版改一下。
2.3 CP模型记忆
我们上面说了CP模型是三种关系,两个角色,一个交易场所,所以我们采用321原则去记忆。
总结
对于CP模型,可以让我们更好是使用多线程去观察一些现象,也可以更好展示条件变量的作用,希望大家下去多去联系,这篇就讲解到这里了,下篇我们开始讲解信号量。