目录
线程概念
1.什么是线程?
2.线程的优缺点
3.线程异常
4.线程用途
线程操作
1.如何给线程传参
2.线程终止
3.获取返回值
4.分离状态
5.退出线程
线程的用户级地址空间:
线程的局部存储
线程的同步与互斥
互斥量mutex
数据不一致的主要过程:
互斥锁
临界区
线程同步
原子性操作
锁的应用---封装
线程安全
死锁
Linux线程同步
先认识一下与等待有关的接口
条件变量
条件变量的初始化
等待条件满足条件变量
唤醒等待
cp问题(生产者消费者模型)
代码实现生产者消费者模型
线程概念
1.什么是线程?
课本的概念:线程是比进程更加轻量化的一种执行流/线程是进程内部的一种执行流。
我们的概念:线程是cpu调度的基本单位/进程是系统资源承载的实体
我们知道线程的创建过程非常的复杂,构建pcb,构建地址空间,构建页表,构建运行队列,初始化各个字段......,而我们的线程,可以理解为,进程创建之后 创建pcb参与分配资源。因此线程的创建比进程更加简单。其次线程实在进程的地址空间内运行。
而线程在进程的地址空间也需要被管理,因此和进程一样,线程的管理需要先描述出他的所有属性,在组织出它的数据结构,但这样再次设计调度算法,数据结构太过麻烦,而刚好进程与线程其实都是执行流,因此我们直接复用进程的一套东西。
实际上我们在真正去创建使用线程时,与教材上的点可能不太一样,linux下具体的实现方案,教材也不会谈到,而是以抽象的概念介绍。
因此在现在cpu拿到一个pcb它的执行力度时小于等于进程的。在Linux中,其实并不会真正的找到线程,而是使用进程的数据结构模拟的线程,因此这些除了整个进程,pcb构成的数据进程被称为轻量级进程(线程)。
我们先以一段代码看看:
在linux操作系统下,在c++中调用的头文件是pthread.h,创建线程的接口pthread_create():
#include<iostream>
#include<unistd.h>
using namespace std;
//c++创建线程需要的头文件、
#include<pthread.h>
void *start(void *arg)
{
const char*name=(char*)arg;
cout<<name<<endl;
while(true)
{
cout<<"i am a new thread,my name is "<<name<<endl;
sleep(1);
}
}
int main()
{
pthread_t tid;
//创建线程的接口
int n=10;
pthread_create(&tid,nullptr,start, (void*)"thread 1");
while(true)
{
cout<<"i am main thread,i am running"<<endl;
sleep(1);
}
return 0;
}
可以看到,在编译运行时,在进入到main时,就已经创建了进程了,在进程中,我们有创建了一个线程用来执行一段代码。这也就侧面说明了线程是进程中的一种执行流。
此时我们在主函数和线程调用的函数同时打印pid,可以看到是同一个进程..
那么现成的调度应该看神呢呢,进程的调度我们看pid,那么线程呢?
使用ps -aL 指令查看进程时,我们会发现还有一个LWP,而线程的调度就是看LWP,
如果更想清晰的观察,我们还可以创建多个线程都来执行这个代码,在ps -aL查看旧就能清楚看到pid与LWP.
线程更新为什么比进程快呢?
线程到线程切换,本质就是pcb->pcb数据切换,地址空间,页表等不变,而pcb->pcb之间也是寄存器产生的临时数据的切换,不需要切换所有的寄存器,只需要局部的寄存器的数据的切换。
其次cpu上除了集成了寄存器,还有其他,比如一块较大的缓存(cahce),通过缓存,由于局部性原理的预加载机制,我们的要切换之前的数据可能就被放到了缓存中,此时的cpu继续访问内存是,就直接存缓存中读取数据。
总结:1.使用的寄存器少 2.不需要从新更新缓存。
对于操作系统,无论是物理内存,文件在磁盘上的加载,都是以4kb的大小呈现,以4kb为基本大小对数据加载,物理内存中以4kb为一个页框,而整个物理内存中有很多页框,这些页框有没有被使用,即需要管理这些页框-先描述在组织,在内核中会为其维护一个数据结构,strcut page,不是很大,主要看flag。
对与虚拟地址,一般为32位,在使用虚拟地址是被分为三部分,前十位用作页目录的映射,中间十位用做二级页表映射的就是物理内存的页框,也就是前20位用来找页框,最后用页框的起始地址加上最低的十二位(页内偏移)来找到我们的物理内存。
根据页框目录,找到页框,再从页框里的呢容找到叶匡的起始地址,再根据起始地址加上12位低地址找到物理内存的地址。整个转化过程在cpu上集成的MMU上已经转化完成了。
所以划分页表的本质就是划分虚拟地址。
2.线程的优缺点
优点:
1.创建,调度,释放的量级更轻
2.可利用多处理器的可并行数量,可以被多个执行
3.计算密集型应用可以分解到多个线程运行。
4.占用资源少
缺点:
3.线程异常
4.线程用途
线程操作
首先,需要注意的是,在Linux系统下没有真正的线程,只是进程模拟出来的轻量级进程,那么我们在使用的时候,也没有线程的一些列接口操作,有的只是进程。因此在应用层提供了一套线程的原生库pthread,该库中提供了线程一系列的操作方法。
这也是Linux系统设计这的特殊设计。因此在编译时,我们需要指明我们的库pthread。
1.如何给线程传参
pthread_create创建线程
对于参数,第一个为线程id,本质上就是一个长整型,第二参数我们一般为空,第三个为线程执行的接口,第四个为接口的参数,参数会被接口回调。
以创建多进程为例,
using funct=function<void()>;
class ThreadData{
public:
ThreadData(const string &Name,const uint64_t &Time,funct f ):name(Name),time(Time),fc(f)
{}
string name;
uint64_t time;
funct fc;
};
void *ThreadRotine(void *args)
{
ThreadData *data=static_cast<ThreadData*>(args);
cout<<"获取到线程数据,threadname:"<<data->name<<",threadtime:"<<data->time<<",调用接口:"<<endl;
data->fc();
sleep(1);
}
void func()
{
cout<<"i am a part of action"<<endl;
sleep(1);
}
#define N 5
int main()
{
vector<pthread_t> pthreads;
pthread_t tid;
//创建多个线程
for(int i=0;i<N;i++)
{
char threadname[20];
snprintf(threadname,sizeof(threadname),"%s-%d","thread",i+1);
ThreadData *t=new ThreadData(threadname,uint64_t(nullptr),func);
pthread_create(&tid,nullptr,ThreadRotine,t);
sleep(2);
}
//创建完成之后,保存tid,
pthreads.push_back(tid);
while(true)
{
cout<<"main thread running...."<<endl;
sleep(1);
}
}
此时运行代码,我们可以看到正常运行:
但是我们说过,编写线程代码我们需要小心仔细,线程的鲁棒性较低,例如这里我们让其中一个线程出现除零错误:
此时所有的线程就全崩掉了。
2.线程终止
由上述我们基本了解到了线程的使用,那么初次之外,如何终止线程呢?方法也恒简单:
1.直接在线程接口给出返回值,例如返回一个空指针。
2.exit()直接终止,但是不仅仅是线程,进程也会退出!!
3.因此线程库也有提供的接口pthread_exit(),用于终止线程。
3.获取返回值
线程退出默认要被等待,如果线程退出没被等待,子也可能会产生与进程一样的僵尸问题,其次,线程退出时,主线程如何获取新线程的退出信息。
pthread_join 用于等待子线程终止
返回值为整形退出码,参数一为tid,参数二为一个二级指针,用来接收线程接口执行完的的返回值,但我们看到线程函数的返回值为void*,而这里是void**,
string Tohex(pthread_t id)
{
char tid[64];
snprintf(tid,sizeof(tid),"0x%x",id);//以十六进程写
return tid;
}
void *ThreadRotine(void *args)
{
string name=static_cast<const char*>(args);
int cnt=5;
while(cnt--)
{
cout<<"获取到线程数据,threadname:"<<name<<",线程地址"<<Tohex(pthread_self())<<endl;
sleep(1);
}
//return nullptr;
pthread_exit(args); //两个效果一样
return args;
}
int main()
{
vector<pthread_t> pthreads;
pthread_t tid;
pthread_create(&tid,nullptr,ThreadRotine,(void*)"thread 1");
cout<<"main thread running...."<<",线程地址"<<Tohex(pthread_self())<<endl;
sleep(5);
//参与线程终止 ,以阻塞状态一直等待
void*ret=nullptr;
int n=pthread_join(tid,&ret);
cout<<"get new thread data:"<<(char*)ret<<endl;
cout<<"main thread done "<<"exit code:"<<n<<endl;
sleep(5);
return 0;
}
可以看到,我们设置ret指针,通过接口pthread_join我们可以拿到线程执行(这里就是threadrotine)的返回值,之后将该返回值给给我们的ret,并且完成线程等待,这也是为什么参数二是一个输出型参数。
那么如果线程异常呢?返回拿到的是什么?实际上异常时,就会返回-1;
其次返回值的类型为void*,即任意类型的数据都可以传参,返回给我们。
4.分离状态
线程模式默认是joinable状态,但是 线程可以被设置分离状态,如果有一个线程此时我们不管不顾,也没有等待,但我们希望退出时,他的资源都会被回收,此时就可以把该线程设置为分离状态。
线程” 分离“不” 分家“,虽然主线程不管分离的线程,分离状态的线程退出,运行结果都不知道,但是分离状态的线程出现了问题,还是会影响到主线程的。
一般我们这样设置:
pthread_detach(pthread_self());
此时pthread_join去等待时,等待失败,返回值为22,不需要去等待.
5.退出线程
当子线程一直在运行时,作为主线程,我们想去结束掉子线程,此时就可以调用接口,pthread_cancel:
参数为线程的id。
总结:
线程的用户级地址空间:
由于线程整个是一个库,而库的使用是加载到内存当中,那么加载到内存中的那一部分呢?由于该库是对线程做管理,即就是对用户级内存的管理,加载到共享区(堆栈之间)。
void running()
{
int cnt=5;
while(cnt--)
{
cout<<"i am a thread,i am running "<<endl;
sleep(1);
}
}
int main()
{
//定义线程对象
thread t(running);
//等待线程
t.join();
while(true)
{
cout<<"i am main thread"<<endl;
sleep(1);
}
return 0;
}
在编译的时候每有问题,可是到了链接还是报错,此时还需要我们在编译时 -lpthread,指明库。
实际上这些语言支持的多线程,都是用的原生的线程库,故此还需要指明库文件。之后运行与系统的多线程一摸一样。
线程的局部存储:
其次,当我们对全局变量加上__thread修饰,此时每个线程用该变量都是独一个全局变量。__thread只能在内置类型上使用。
注意的点:
线程可以fork()创建子进程,其次也可以进行进程替换(但是会影响自身及其他线程)。
对于用户级执行流与内核级的lwp是1:1的。此外tid是系统内核级别的,用户及时接触不到的。
其中较为重要的就是线程栈,线程栈决定了一个线程本身执行流。每一个线程都有自己的线程栈,虽然如此,但线程与线程之间是可见的,虽然独立,但不私有,我们可以获取线程栈中的数据,升值修改。对于全局变量,所有的线程都可以看到,且地址一样。
线程的局部存储
在线程中,还提供了一个编译选项__thread,在定义变量时,我们可以用__thread修饰,修饰之后定义的全局变量,每一个线程都是单独的一份了,相互不影响。需要注意的是只能对内置类型使用。
线程的同步与互斥
临界资源:多线程执行流共享的资源就叫做临界资源
临界区:每个线程内部,访问临界资源的代码,就叫做临界区
互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用 原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成。
互斥量mutex
int tickets =100;
struct threadData
{
threadData(int id)
{
name="thread_"+to_string(id);
}
string name;
};
void *getticks(void *args)
{
threadData * data=static_cast<threadData*> (args);
while(true)
{
if(tickets>0)
{
usleep(1000);
cout<<data->name<<"get a tiket:"<<tickets<<endl;
tickets--;
}else
{
cout<<"tickets is null"<<endl;
break;
}
}
cout<<"get tickets quit,name:"<<data->name<<endl;
return nullptr;
}
int main()
{
//一个线程代表一个用户
vector<pthread_t> tids;
vector<threadData *> thread_datas;
for(int i=0;i<Num;i++)
{
//创建线程
pthread_t id;
threadData *tmp=new threadData(i);
pthread_create(&id,nullptr,getticks,tmp);
tids.push_back(id);
thread_datas.push_back(tmp);
}
for( int i=0;i<tids.size();i++)
{
pthread_join(tids[i],nullptr);
}
for( int i=0;i<tids.size();i++)
{
delete thread_datas[i];
thread_datas[i]=nullptr;
}
return 0;
}
四个用户同时进行抢票,抢100张票,此时我们在看看执行结果:
我们很清楚的看出一些问题:
首先就是抢相同的票,其次,命名判断tickets<0旧停止,还会有抢到负数的。
此时对于这段代码,tickets就是全局数据,也是线程共享数据,我们的四个进程并发的去访问这些数据。
对于上述的问题,tickets最后变为负数,这种现象被称为由于共享数据而产生的数据不一致问题(即此时我在进行数据操作时,你也在做数据操作)。所以对于ticktes-- /++这种操作是不安全的。
数据不一致的主要过程:
cpu先从内存读取到数据ickets,之后在cpu中做--操作,之后在写回内存。每一步都有对应的汇编操作,线程在执行时将共享数据加载到内存当中,把数据内容(通过拷贝的方式)变成了自己的上下文,此时别的线程也来干这种操作,线程2先成功的把数据改完并返回给cpu,但是此时线程2辛运,分给他的时间片也多,于是线程2执行了多次之后结束了,带着结果走了,此时又回到了第一个线程,线程一,此时并不是立马继续先前的工作,而是先恢复上下文数据(此时的数据还是之前保存的数据),再进行cpu的操作,此时线程1还认为该数据没有变,实际上线程2已经执行多次了,于是就出现了数据不一致的问题。
既然出现了这种问题,那么我们如何来解决呢?
互斥锁
了解互斥锁,就先了解一下源生库中提供的关于互斥锁的接口:
对于类型pthread_mutext_t 时库中提供的一种类型。
除了这种方式外,也可以直接定义一把全局的锁,之后就不需要初始化,不需要释放。定义方式就是上图中的最后一行。
由接口名也能看到分别有销毁和初始化,通过该接口,我们创建锁。之后便是“上锁”,需要另一个接口:pthread_mutex_lock
pthread_mutex_lock 就是加锁,pthread_mutex_unlock就是解锁。
临界区
之前已经说过,我们的共享资源线程同时访问,会出现数据不一致,这份资源我们也叫临界资源,对于临界资源我们是需要加锁的,因此我们也只需要对之访问了临界资源的那一小部分的临界区加锁。加锁的原则也是代码越小,区域越小更好。
加锁之后,就会限制每次只有一个线程访问,串型访问。
加锁之后的抢票系统:
#define Num 4
//利用多线程模拟一轮抢票
int tickets =100;
struct threadData
{
threadData(int id, pthread_mutex_t * tmp)
{
name="thread_"+to_string(id);
Lock=tmp;
}
string name;
//添加锁
pthread_mutex_t* Lock;
};
void *getticks(void *args)
{
threadData * data=static_cast<threadData*> (args);
while(true)
{
//加锁
pthread_mutex_lock(data->Lock);
//线程到锁这里,如果申请锁成功就执行,否则就阻塞等待
if(tickets>0)
{
usleep(1000);
cout<<data->name <<" get a tiket make : "<<tickets<<endl;
tickets--;
//解锁
pthread_mutex_unlock(data->Lock);
}else
{
cout<<"tickets is null"<<endl;
//解锁
pthread_mutex_unlock(data->Lock);
break;
}
//合适的sleep使得 线程不会一释放完就去拿锁 而是其他线程去拿锁
usleep(15);
}
cout<<"get tickets quit,name:"<<data->name<<endl;
return nullptr;
}
int main()
{
//创建锁
pthread_mutex_t lock;
pthread_mutex_init(&lock,nullptr);
//一个线程代表一个用户
vector<pthread_t> tids;
vector<threadData *> thread_datas;
for(int i=0;i<Num;i++)
{
//创建线程
pthread_t id;
threadData *tmp=new threadData(i,&lock);
pthread_create(&id,nullptr,getticks,tmp);
tids.push_back(id);
thread_datas.push_back(tmp);
}
//当不用的时候,销毁锁
pthread_mutex_destroy(&lock);
for( int i=0;i<tids.size();i++)
{
pthread_join(tids[i],nullptr);
}
for( int i=0;i<tids.size();i++)
{
delete thread_datas[i];
thread_datas[i]=nullptr;
}
return 0;
}
修改代码之后,此时我们在运行,首先看到确实不会抢到负数了,但是此时票只有一个进程在抢。但当我们在线程抢完票之后,休眠个十几毫秒,此时就能看到多个进程都在抢,这是因为如果不休眠,某个线程拿到了锁之后,执行完代码,释放锁之后立马再去申请锁,导致其他线程拿不到锁
,一次你会看到只有一个线程抢票。而且在现实中也不可靠,我们不会抢完一张票非常快速再去抢一张。
线程同步
所谓的线程同步,就是让所有的线程按顺序,平均的申请锁,获取资源。例如上述抢票,如果我们设置一个观察员,首先让没有拿到锁的线程在外面排队,让拿到锁的线程,获取资源之后,释放锁,然后排到队尾,以这种方式来竞争资源,更加的有顺序性。
原子性操作
锁本身就是共享资源,申请锁和释放锁本身就被设计成原子性操作。那么如何做到的呢?
在临界区中时,线程可以被切换,不过把锁带走了,即使我不在了,也没有其它线程会去访问资源。对于线程,要么就没锁,要么就释放锁,因此对于当前线程访问临界区时,对于其他线程是原子的。
对与每一条汇编语句 ,我们认为他也是原子。
上锁与解锁的本质:
实际上加锁也并不是好的,这是利用时间来保证安全。
锁的应用---封装
在c++中有一种利用对象生命周期能更好的管理资源的思想--RAII,直接在代码的临界区上锁解锁没问题,但书写起来,看起来都不太好,此时我们可以将需要调用的接口都封装在一个类中,通过构造与析构调用对应的上锁,解锁。此时我们再创建一个全局的锁,此时代码就更加轻便。
class Mutex{
public:
Mutex(pthread_mutex_t* lock):_lock(lock)
{}
void Lock()
{
pthread_mutex_lock(_lock);
}
void Unlock()
{
pthread_mutex_unlock(_lock);
}
~Mutex()
{}
private:
pthread_mutex_t* _lock;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t* lock):_lockguard(lock)
{
_lockguard.Lock();
}
~LockGuard()
{
_lockguard.Unlock();
}
private:
Mutex _lockguard;
};
//代码更加简便
void *getticks(void *args)
{
threadData * data=static_cast<threadData*> (args);
while(true)
{
//用一个花括号表示临界区
{
LockGuard lockguard(&lock);
//创建完成时自动调用构造与析构完成 上锁与解锁
//创建在临界区 在该区域进行线程上锁
if(tickets>0)
{
usleep(1000);
cout<<data->name <<" get a tiket make : "<<tickets<<endl;
tickets--;
}else
{
break;
}
}
//合适的sleep使得 线程不会一释放完就去拿锁 而是其他线程去拿锁
usleep(15);
}
cout<<"get tickets quit,name:"<<data->name<<endl;
return nullptr;
}
线程安全
首先来看看两个概念:
线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作, 并且没有锁保护的情况下,会出现该问题。重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们 称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重 入函数,否则,是不可重入函数
死锁
死锁指的是一组进程中各个进程占用不会释放的资源,但因互相申请被其他进程给占用不会释放的资源而出一种永久等待的状态。
死锁的四个必要条件:
1.互斥条件:每一个资源每次只有一个执行流使用(前提)
2.请求与保持条件:一个执行流因请求资源而阻塞时,不释放已占有的资源。
3.不剥夺条件:一个执行流已经获得资源时,在未使用完成前,不能被剥夺。
4.循环等待关系:若干执行流形成了一种头尾相接的循环等待资源的关系。
死锁问题的避免:
想要避免死锁,就要破坏四个必要条件:
1.互斥条件一般不可避免
2.请求与不保持条件:一个执行流因请求资源而阻塞时,释放已占有的资源。(pthread_mutex_trylock方式,请求锁失败,直接返回不再等待)(避免锁未被释放)。
3.剥夺条件:在未使用完资源前,可以进行剥夺。资源一次性分配。
4.不形成环路:按顺序申请锁,不交叉的申请锁。
Linux线程同步
先认识一下与等待有关的接口
条件变量
条件变量的初始化
pthread_cond_init 接口是就用来初始化条件变量的。
销毁:pthread_cond_detrory。 也可以定义全局变量,接口的使用与线程锁非常类似。
等待条件满足条件变量
线程会先去队列等待而不是获取资源。
唤醒等待
唤醒在指定条件变量下的线程,从等待队列中。 sgnal是唤醒一个线程,broadcast是唤醒所有线程。
了解了以上接口,我们用代码了解一下:
uint64_t cnt=0;
using namespace std;
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
void *Count(void *args)
{
//分离线程
pthread_detach(pthread_self());
uint64_t count=(uint64_t)args;
cout<<"pthread create succeed: pthread_"<<count<<endl;
while(true)
{
{
pthread_mutex_lock(&mutex);
//无论是哪个线程,进来时先要去队列等待 sleep
pthread_cond_wait(&cond,&mutex);
cout<<"pthread name :pthread_"<<count<<"is running,:cnt:"<<cnt++<<endl;
sleep(1);
pthread_mutex_unlock(&mutex);
}
}
}
int main()
{
for(uint64_t i=0;i<5;i++)
{
//创建线程
pthread_t id;
pthread_create(&id,nullptr,Count,(void*)i);
sleep(1);
}
sleep(2);
cout<<"i am main thread ,control begin"<<endl;
while(true)
{
//唤醒条件变量下等待的线程,默认一般都是队头的线程
pthread_cond_signal(&cond);
sleep(2);
cout<<"pthread is signal sucessfully"<<endl;
}
}
首先有一个问题就是为什么 先上锁,在进行等待?
实际上pthread_cond_wait接口,有一个参数就是传锁进来,当使用该接口等待时,会自动释放锁。等待之后,我们就需要在主线程再次唤醒她,之后就可以去获取资源了。当然我们当前的资源并不是一个临界资源,如果是临界资源,我们就要去判断:
首先判断资源是否就绪,是否还有资源(临界资源也是有状态的),就是要访问临界资源,也就是加锁之后。 因此决定是否休眠等待,是在加锁与解锁之间。
cp问题(生产者消费者模型)
生产者消费者模型在我们的日常生活也是常见的,生产者 ==供货商,消费者(顾客),超市(缓存),因为有超市的存在,使得生产者与消费者没有直接关系,且使得生产与消费解耦,通过这种方式,资源能得到更好的处理。
在开发过程中,使用生产消费模型更有优点:
超市,或者仓库对于消费者和生产者,都是共享资源,而(共享)临界资源都会存并发问题:
1.生产者与生产者存在竞争,要保证互斥。
2.生产者与消费者 首先要保证互斥关系,其次还要保证同步。
3.消费者与消费者互斥关系。
因此生产与消费模型存在3个关系 ,2个角色,1个场所(特定结构的内存空间)。
代码实现生产者消费者模型
template<class T>class BlockQueue
{
static const int defaltnum=100;
public:
BlockQueue(int maxcapacity=defaltnum):max_capacity(maxcapacity)
{
min_capacity=0;
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&c_cond,nullptr);
pthread_cond_init(&p_cond,nullptr);
low_water=max_capacity/2;
high_water=max_capacity*2/3;
}
T pop()
{
//对数据操作,需要加锁
pthread_mutex_lock(&_mutex);
//判断是否进行等待
if(_q.size()==min_capacity)
{
pthread_cond_wait(&c_cond,&_mutex);
}
T temp=_q.front();
_q.pop();
//消费者消费了,所以一定保证有空余让生产者生产,我们可以等待消费max/2,之后生产
if(low_water>_q.size())
pthread_cond_signal(&(p_cond));
pthread_mutex_unlock(&(_mutex));
return temp;
}
void push(const T& data)
{
//对数据操作,需要加锁
pthread_mutex_lock(&_mutex);
//之后就需要判断等待
if(_q.size()==max_capacity)
{
//进行等待,释放锁,进入阻塞
pthread_cond_wait(&p_cond,&_mutex);
}
_q.push(data);
//生产者保证一定有数据,所以可以唤醒消费者来消费,我们可以等待生产到capacity的2/3,之后一会在消费
if(high_water<_q.size())
pthread_cond_signal(&(c_cond));
pthread_mutex_unlock(&(_mutex));
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&c_cond);
pthread_cond_destroy(&p_cond);
}
private:
std::queue<T> _q; //共享资源
//最大值
T max_capacity;
//最小值
T min_capacity;
//需要对共享资源加锁
pthread_mutex_t _mutex;
//生产消费都是按顺序的,需要实现同步,提供条件变量
pthread_cond_t c_cond;
pthread_cond_t p_cond;
//生产与消费的水位线
int low_water;
int high_water;
};
#include<pthread.h>
#include<unistd.h>
#include<iostream>
#include"BlockQueue.hpp"
using namespace std;
//生产者
void *Producer(void *args)
{
BlockQueue<int> * q=static_cast<BlockQueue<int>*> (args);
//生产
int data=0;
while(true)
{
data++;
q->push(data);
cout<<"生产一个数据:"<<data<<endl;
sleep(2);
}
}
//消费者
void *Consumer(void *args)
{
//消费
BlockQueue<int> * q=static_cast<BlockQueue<int>*> (args);
while(true)
{
int data=q->pop();
cout<<"已消费一个数据 :"<<data<<endl;
}
}
int main()
{
pthread_t prod;
pthread_t cons;
//线程创建的顺序一定是先消费者,再生产者 ,否则生产者一生产消费者就会消费
BlockQueue<int> *queue=new BlockQueue<int>();
pthread_create(&cons,nullptr,Consumer,queue);
pthread_create(&prod,nullptr,Producer,queue);
//等待子线程
pthread_join(cons,nullptr);
pthread_join(prod,nullptr);
delete queue;
return 0;
}
结果: