多线程——同步 | 条件变量 | 基于阻塞队列的生成者消费者模型
- 🏓生产者消费者模型
- 🏸同步概念
- 🏸生产者消费者模型的特点
- 🏓同步的应用
- 🏸条件变量
- 🏸条件变量接口
- 🏓基于阻塞队列的生产者消费者模型
- 🏸pthread_cond_wait(&_cond,&_lock)的第二个参数
- 🏸细节处理
- 🏸生产消费的内容是任务
- 🏸生产者消费者模型高效的体现
- 🏓两个阻塞队列的321模型
- 🏓总结
🏓生产者消费者模型
以生活中消费者生产者为例:
生活中,我们大部分人都扮演着消费者的角色,会经常在超市买东西,比如买方便面,而超市的方便面是由供应商生成的。所以我们就是消费者,供应商就是生产者,而超市就是一个交易场所。
- 将读取数据的线程叫做消费者线程。
- 将产生数据的线程叫做生产者线程。
- 将共享的特定数据结构叫做缓冲区。
超市的供应商肯定不止一家,即使同一种商品的供应上也不止一家,比如有康师傅牌子方便面,还有白象牌子的方便面等等。不同牌子方便面的生产者它们之间的关系是竞争关系,竞争的表现就是互斥。
站在超市的角度,假设只有一块区域是买方便面的,当生产者来供货的时候,只能让一家牌子来供货,否则就会导致这块区域康师傅方便面和白象方便面混着放,对消费者来说很不友好。
- 生产者线程和生产者线程之间是互斥关系。
- 在同一时间只能有一个生产者线程来访问缓冲区。
假设现在超市只有一包方便面了,但是同时来了好多消费者都要买方便面,此时这些消费者之间的关系也是竞争关系,我买上你就买不上了。所以当只有一包方便面的时候,只能有一个买方便面的消费者进入超市。
- 消费者线程和消费者线程之间是互斥关系。
- 在同一时间只能由一个消费者线程来访问缓冲区。
再假设,超市的方便面卖完了,生产者正在给超市供货,而消费者也正在买方便面,那消费者到底买没买到方便面?有可能生产者刚把方便面搬下来,还没来及摆上去,那么消费者就没有买到,也由可能生产者把方便面摆上去了,那么消费者就买到了。所以最好的办法就是生产者供货的时候,不让消费者进来。
在Linux中,缓冲区存放的都是数据,数据是可以覆盖的,比如消费者线程在读取缓冲区中的数据时,数据是"hello world",刚刚读取完"hello",生产者线程把"world"改成了"shanghai",那么消费者线程读取到的就成了"hello shanghai",就出错了。所以最好的办法就是当消费者线程访问缓冲区的时候,生产者线程不能访问缓冲区。
- 消费者线程和生产者线程之间是互斥关系。
- 在同一时间内只有一个线程可以访问缓冲区。
🏸同步概念
- 在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。
就像上篇文章中抢票的互斥代码,在每个线程抢完票以后没有进行延时代替其他处理动作时,所有票都被一个线程抢了,其他线程没有机会抢。
- 由于竞争能力弱而缺乏调度的线程就处于饥饿状态。
而同步就是让所有线程按照一定顺序来抢票,做到雨露均沾,避免线程饥饿问题产生。具体如何实现后面会详细讲解。
继续拿超市来说,生产者不能无休止的向超市中供货,否则消费者无法进来消费,最终方便面会放不下。同样,消费者也不能无休止来买方便面,否则生产者进不来,方便面就会卖完,而且没有人来供货。
所以最好的办法就是生产者供货,当货架摆满了就不供货了,让消费者来买,当方便面卖完了再让生产者来供货,从而让消费者和生产者协同起来。
- 消费者线程和生产者线程之间又是同步关系。
- 生产者线程和消费者线程按照一定顺序去访问缓冲区。
根据上面例子和分析,对于生产者消费模型的本质可以总结为321:
- 3种关系:生产者和生产者(互斥关系),消费者和消费者(互斥关系),生产者和消费者(互斥&&同步关系)。
- 2种角色:生产者和消费者
- 1个交易场所:一段特定结构的缓冲区
只要我们使用生产者消费者模型,本质工作就是在维护321原则。
🏸生产者消费者模型的特点
有了超市这个交易场所,生产者只要给超市供大量的货即可,比如几万包方便面,不用关心是消费者什么时候来买,只需要专注自己的生成即可。
对于消费者而言,只需要直接去超市买方便面就行,也不用等待方便面的生产。
超市只需要做的就是方便面卖完了,告诉生产者来上货,然后告诉消费者来买。消费者和生产者完全独立,不存在仍然交集。
- 生产者消费者模型实现了消费者线程和生产者线程之间的解耦。
我们平时写的C/C++代码,如果将main函数看成是生产者,普通函数看出是消费者,那么它两就存在高度耦合。
当执行func目标函数的时候,main函数在等待,只有func执行完毕以后main函数才能继续执行下去。如果将这两个函数看出两个执行流,那么它们就存在高度耦合。
而生产者消费者模型就成功的让生产者执行流和消费者执行流解耦了,生产者只管向缓冲区生产数据,消费者只管从缓冲区消耗数据,不用关心对方的状态。
大部分人在周一到周五上班,在周六日休息,上班时候时间比较少,去超市消费的人也比较少,由于消费者和生产者互斥,所以就可以让生产者在周一到周五的时候来上货。
当周六日消费者休息的时候,去超市消费的人就比较多,方便面也卖的比较快,但是由于生产者供货量足够,所以并不会因为买的人多了就不够了的情况。
- 生产者消费者模型支持生产者线程和消费者线程忙闲不均的问题。
- 因为缓冲区能够缓存一定量的数据。
我们买东西肯定不会直接去找供应商,因为人家不零售,因为生产者如果零售的话,每次开机器就仅生成几包方便面,成本高,效率低。
对于消费者而言,直接去找生产者还需等待生成者完成商品生成,消耗时间成本高,效率也低。
- 生产者消费者模型提高了了生产者线程和消费者线程的执行效率。
🏓同步的应用
同步是为了让多线程按照一定顺序互斥访问临界资源,在上面的生产者消费者模型中,如何实现同步呢?
🏸条件变量
- 条件变量:用来描述某种临界资源是否就绪的一种数据化描述
当一个线程互斥地访问某个临界资源时,它可能发现在其它线程改变状态之前,它什么也做不了。
例如,存在一个共享的队列,生产者线程负责生产数据到队列中,消费者线程负责从队列中读取数据,当消费者线程发现队列为空时就不要再去竞争锁去访问对了了,而是应该等待,直到生产者线程将数据生成到队列中。
- 要想让消费者线程等待,就需要使用到条件变量。
那么条件变量是什么呢?继续拿超市举例:
假设现在超出的架子上一次只放一包方便面,只有这包方便面被人买走了,才会放上新的方便面。
此时来了一堆消费者消费者都要买方便面,因为只有一包,所以只能去竞争了,那些竞争能力强的才能买上方便面,甚至不停的抢不停的买,此时那些竞争能力弱的消费者就会始终都买不到方便面。
- 竞争能力弱的消费者就会始终抢不到锁,就会产生饥饿问题。
为了维持秩序,超市的工作人员设置了一个等待区,所有消费者都在这里排队购买,方便面被摆出来了,工作人员让一个消费者进去买,没有摆出来就等着。如果消费者想买两包甚至多包,只能重新排队。
等待区及工作人员就相当于条件变量。
多线程互斥访问一个临界资源时,为了让这些线程按照一定顺序访问,将这些线程都放在条件变量的等待队列中,当另一个线程让条件变量符合条件(唤醒线程)时,队列中的第一个线程就去访问临界资源。
🏸条件变量接口
条件变量同样是由POSIX线程库维护的,所以使用的是POSIX标准,和互斥锁的接口非常相似。
创建条件变量:
pthread_cond_t cond;
- 同样要加pthread_。
- cond是英文condition的缩写。
条件变量的初始化,释放:
#include <pthread.h>
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
具体使用参照互斥锁,只是传递的参数是创建好的条件变量。
放入条件变量的等待队列:
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
- 第一个参数:创建的条件变量地址。
- 第二个参数:互斥锁的地址,这个必须有,后面本喵再讲解为什么必须传锁。
- 返回值:放入条件变量的等待队列成功返回0,失败返回错误码。
- 接口的作用:调用该接口的线程放入传入条件变量的等待队列中。
唤醒条件变量等待队列中的一个线程:
int pthread_cond_signal(pthread_cond_t *cond);
- 参数:所在等待队列的条件变量地址
- 返回值:唤醒成功返回0,失败返回错误码
- 接口作用:由另一个线程(通常是主线程)唤醒指定条件变量等待队列中的一个线程。
唤醒条件变量等待队列中的所有线程:
int pthread_cond_signal(pthread_cond_t *cond);
- 参数:所在等待队列的条件变量地址
- 返回值:唤醒成功返回0,失败返回错误码
- 接口作用:由另一个线程(通常是主线程)唤醒指定条件变量等待队列中的所有线程。
将条件变量用到我们之前抢票的代码中,实现多线程按照一定顺序互斥抢票。
- 创建全局的条件变量,全局共享资源,全局互斥锁。
- 每个线程一抢上锁以后就进入条件变量的等待队列。
- 主线程每个一秒钟唤醒一个等待的线程进行抢票。
这5个线程按照线程5,线程4,线程3,线程2,线程1的顺序抢票。
- 每个线程都会被先挂起到等待队列中,等待主线程的唤醒。
- 唤醒一个线程抢完票以后会继续进入等待队列,并且排在队列的后面。
如果不使用同步,就会只有一个线程在抢票,其他线程就会处于饥饿状态。
使用pthread_cond_broadcast()接口一次唤醒在条件变量下等待的所有线程进行抢票,每隔一秒唤醒一次。
仍然是按照一定顺序抢票,只是进行抢票的线程是5个同时进行。
🏓基于阻塞队列的生产者消费者模型
上图所示就是要实现的模型,有一个生产者线程,一个消费者线程,还有一个阻塞队列。
- 阻塞队列使用C++容器中的queue来实现。
- 阻塞队列是公共资源,所以要保证它的安全,线程A和线程B要互斥访问,只需要把锁就能实现生产者和消费者,生产者和生产者,消费者和消费者之间的互斥。
- 阻塞队列中有数据消费者才能读,此时生产者不能进行生产,生产者线程要进入它的等待队列中。
- 阻塞队列中没有数据或者不满时,生产者才能进行生产,消费者在生产的时候不能读,要进入它的等待队列。
阻塞队列结构体:
创建一个类,复用C++容器中的队列。
- 维持3种关系,使用到一个互斥和,两个条件变量。
- 给阻塞队列设置一个上限,防止生产者线程无休止的生产数据。
在构造函数中初始化一个互斥锁和两个条件变量,在析构函数中释放一个互斥锁和两个条件变量。阻塞队列的容量给一个缺省值。
生产者生产数据:
- 生产者线程调用生产数据接口时,先持有锁进入临界区访问阻塞队列。
- 当生产条件不满足(阻塞队列满)时,在生产者条件变量的等待队列中挂起。
- 当生产条件满足时,生产数据到阻塞队列中,并且唤醒消费者线程来消费数据。
消费者线程是由生产者线程唤醒的,当阻塞队列中为空时,消费者线程全部都在等待,所以生产者线程生产了数据后就要唤醒消费者线程来消费,防止阻塞队列生产满。
消费者消费数据:
- 输入型参数使用const引用类型,例如push接口,使用const T& in。
- 输出型参数使用指针变量,如上图代码中,使用T* out。
- 消费者线程先持有锁进入临界区访问阻塞队列。
- 当阻塞队列为空,不满足消费条件时,挂起等待。
- 满足生产条件后消费数据,并且唤醒生产者线程来生产数据。
生产者线程是由消费者线程唤醒的,当阻塞队列满了后,生产者线程无法生产,只有消费者线程消费了数据生产者才能进行生产。
🏸pthread_cond_wait(&_cond,&_lock)的第二个参数
在使用pthread_cond_wait接口的时候,必须传入第二个参数。
线程在条件变量的等待队列中等待时,要知道它排队的目的,而排队的目的就是要拿到要访问临界资源的那把锁,从而进去临界区。
- 该函数的第二个参数必须是正在使用的锁。
从代码中可以看到,在线程访问阻塞队列的时候,首先要做的事情就是申请锁,然后才能进入临界区。
当该线程发现自己不满足条件而需要挂起等待时:
- 按照我们之前的认识,该线程就会抱着锁进入条件变量的等待队列中等待。
- 此时其他线程即使被唤醒了,也无法拿到锁,从而无法进入临界区访问临界资源。
pthread_cond_wiat接口的实现者也考虑到了这个问题,所以在持有锁的线程进入队列等待时会自动释放自己持有的锁,让其他线程申请。
int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* lock)
{
//挂起灯操作
pthread_mutex_unlock(lock);//释放锁
//记录锁
}
如上面所示的伪代码那样,而且我们知道,释放锁的操作也是原子的,所以锁很安全。
- 该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起。
除此之外,该接口还会记录下当前现在在挂起时释放的锁。因为在唤醒线程的时候,pthread_cond_signal(pthread_cond_t* cond)只有一个参数,此时醒来的线程要知道它该去申请哪把锁。
- 线程在被唤醒返回的时候,会自动的重新获取挂起时传入的锁。
此时我们就可以使用这个基于生产者消费者模型的阻塞队列了:
在主线程中new一个阻塞队列在堆区,创建生产者线程和消费者线程时把阻塞队列传过去,让其成为公共资源。
生产者线程将随机数生成到阻塞队列中,消费者线程从阻塞队列中读取数据。
- 此时生产者线程每隔1秒生成一个数据到阻塞队列中,消费者线程没有延时。
最开始阻塞队列里是空的,消费者线程只能在等待队列中等待,当生产者线程生成好数据到阻塞队列中后,消费者线程满足条件才开始消费数据。
- 消费者线程虽然没有延时,可以疯狂消费,但是也得等生产者生成好数据到阻塞队列中才行,否则只能等待。
- 所以表现出来的就是生产者生成什么,消费者紧跟着消费什么。
同样让消费者每一秒钟消费一次,生产者没有延时:
生产者没有延时,可以疯狂生成,所以第一时间将阻塞队列放满数据。消费者每隔一秒消费一个数据,按照生产的顺序去消费。
- 阻塞队列满了以后,消费者每消费一个数据,生产者才能生产一个数据,在消费者延时期间,生产者只能等待。
生产者和消费者都不进行延时,并且将阻塞队列的容量扩大到5000,此时消费者和生产者进行的活动的顺序就无法预测了,但是仍然符合321原则。
🏸细节处理
使用while代替if判断:
假设现在有多个生产者线程在进行数据生产。
- 当阻塞队列满了以后,所有生产者线程都会在条件变量的等待队列中等待。
- 某个线程调用挂起接口失败
- 消费者线程一次唤醒所有生产者线程
- 上面两种情况,生产者线程都会接着向下执行,会生成数据到阻塞队列中。
如果是挂起失败,而且阻塞队列满了,那么生产数据到阻塞队列就会出问题。如果是一次唤醒所有生产者线程,但是只消费了一个数据,阻塞队列中只有一个空位置,那么多个生产者线程生产向阻塞队列中生成数据同样会出问题。
上面这种情况被叫做伪唤醒。
此时最好的解决办法就是让生产者线程在向下执行的时候再进行一次判断!!!
此时如果某个线程挂起失败就会再次进行判断然后再次挂起而不会向下执行,造成阻塞队列生成出错。
如果是所有生产者线程被唤醒,这些线程也会再次进行判断是否符合生成条件,因为是互斥访问,所有当某一个线程将唯一的空位置生成数据后,其他线程又不满足生成条件,就会再次被挂起。
消费者线程同理。
同步中的解锁可以在唤醒前也可以在唤醒后:
当生产者线程生成完毕以后,需要解锁,好让其他线程申请锁。在生成完毕后还有唤醒消费者线程进行消费。
- 可以先解锁再唤醒消费者,也可以先唤醒消费者再解锁。
- 一般建议先唤醒再解锁。
🏸生产消费的内容是任务
生产者消费者模型实际上并不是仅仅用来生产消费整型数据的,它更多的是处理任务的。
创建一个计算任务的类,包括加减乘除取模几种计算功能。在类中的仿函数调用回调函数执行具体的计算逻辑,还有一个显示任务的接口。
具体运算逻辑上图代码所示,其中除法和取模运算发生除0错误时,返回-1,并且打印错误信息。
主线程只需要在创建阻塞队列的时候,将阻塞队列实例化成CalTask类型。其他和之前一样,不用动。
生产者线程将要计算的两个数,以及功能指定给计算任务,创建任务对象,然后生成到阻塞队列中。
消费者线程从阻塞队列中读取任务,然后使用仿函数回调计算函数,得到结果。
可以看到生成的任务内容和消费的任务结果,而且是一一对应的。
将阻塞队列中数据改成计算任务的过程中,生产者消费者模型的阻塞队列并没任何修改。
上面只有一个消费者线程和一个生产者线程,可以让这两种角色都有多个来执行计算任务:
只需要在线程中创建3个生产者线程,2个消费者线程。
此时开始运行打印出来的信息就不整洁了,因为多个线程在同时运行,但是还是按照321原则在运行。
🏸生产者消费者模型高效的体现
前面在分析生产者消费者模型时,一直都在说该模型高效,那么到底体现在什么地方呢?
- 多个生产者线程向阻塞队列中生成数据,多个消费者线程从阻塞队列中消费数据。
- 该模型的三种关系决定了访问阻塞队列的线程同一时间只有一个。
尤其是上面代码现象中,消费和生成是一前一后的,对于阻塞队列的访问是串行的,凭什么说这个模型是高效的呢?
在生产者线程和消费者线程中,访问阻塞队列临界资源的代码都只有一条,如上图代码所示,只有临界区的代码才是串行访问的。
- 除了临界区的代码,其他部分代码所有线程都是并发执行的。
实际的线程中,临界区之外的代码会有很多,而且有可能会非常耗时,但是这些代码是可以多线程并发执行的,该模型的效率就会很高。
生产者消费者模型的高效体现在:非临界区的代码,多线程可以并发执行。
该模型的高效并不体现在对阻塞队列(临界资源)的访问上。
🏓两个阻塞队列的321模型
使用上面的基于阻塞队列的生产者消费者模型,将消费者处理完的计算任务保存成日志到磁盘上:
- 两个阻塞队列,一个阻塞队列用来处理计算任务,一个阻塞队列用来处理保存任务。
- 此时中间的线程既是消费者也是生产者。
增加保存任务类:
保存日志信息类只有日志信息字符串成员以及回调函数。
在将日志信息保存到磁盘的实现函数中,使用C语言接口,将日志以追加方式保存到磁盘上,每次保存后换行。
增加保存线程:
为了能让所有线程看到阻塞队列,尤其是中间的消费者,它需要同时看到计算任务和保存任务的阻塞队列,将这两种类型的阻塞队列指针封装在一个类中,相当于一个大结构体。
将两种类型的阻塞队列全部建立在堆区,然后让上面大结构体中的指针指向这两种阻塞队列。
- 大结构体建立在主线程的栈区,主线程在所有线程结束之前不会结束,所以资源可以共享。
然后创建多个生产者线程,多个消费者线程,以及一个保存任务的线程。
新线程实现:
生产者线程只是生成任务,并且将任务对象放入计算任务的阻塞队列中,所以它只需要拿到计算任务阻塞队列的指针即可。产生任务的逻辑和前面只有一个阻塞队列的一样
消费者线程即使消费者,也是生成者,所以需要拿到计算任务的阻塞队列和保存任务的阻塞队列两个队列的地址,如上图红色框所示。
先从计算任务的阻塞队列中读取计算任务并进行处理(消费者),然后将日志信息放入到保存任务的阻塞队列中(生产者)。
最后的线程从保存任务的阻塞队列中读取任务,然后调用仿函数将日志信息保存到磁盘。
- 保存到磁盘的方法是被消费者线程指定的。
可以看到,生成任务完成后会放入到计算任务的阻塞队列中,然后消费者会读取任务并处理并且将日志信息放入到保存任务的阻塞队列中,保存任务的线程会读取任务,将日志保存到磁盘中。
而且当前目录下多了一个日志文件,里面的内容就是消费者处理的任务及结果。
这个过程中,我们最初写的基于阻塞队列的生产者消费者模型始终没有动过,只要有这个模型我们可以创建多个队列处理多种类型的任务。
🏓总结
在互斥的前提下同步才有意义,并且同步能避免线程饥饿,而同步的实现主要是通过条件变量,不符合条件的线程放入到等待队列中,当条件符合后再唤醒。
基于阻塞队列的生产者消费者模型充分应用了线程的同步与互斥,该模型是一个大杀器,可以在很多场景中区应用。