目录
生产者消费者模型的概念
生产者消费者模型的特点
基于阻塞队列BlockingQueue的生产者消费者模型
对基于阻塞队列BlockingQueue的生产者消费者模型的模拟实现
ConProd.c文件的整体代码
BlockQueue.h文件的整体代码
对【基于阻塞队列BlockingQueue的生产者消费者模型的模拟实现】的测试
对基于计算任务的生产者消费者模型的模拟实现
Task.h的整体代码
ConProd.c文件的整体代码
对【基于计算任务的生产者消费者模型的模拟实现】的测试
多生产者多消费者模型的模拟实现(以及对多生产者和多消费者模型的感悟)
ConProd.c文件的整体代码
对【多生产者多消费者模型的模拟实现】的测试
【单生产者单消费者模型】和【多生产者多消费者模型】的应用场景或者说意义(包括如何选择模型比较合适)
生产者消费者模型的概念
(结合下图思考)生产者消费者模式就是通过一个容器,即容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过容器来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给容器,消费者不找生产者要数据,而是直接从容器里取,容器就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个容器就是用来给生产者和消费者解耦的。
生产者消费者模型的特点
生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:
(如果对于线程控制、即线程的同步与互斥感到陌生,请结合<<线程的互斥与同步>>一文进行阅读)
1、生产者消费者模型有三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
2、生产者消费者模型有两种角色: 生产者和消费者。(通常由进程或线程承担)
3、生产者消费者模型有一个交易场所: 通常指的是内存中的一段缓冲区,即一块内存空间,可以自己通过某种方式组织起来。
我们用代码编写生产者消费者模型的时候,主要就是对以上三个特点进行维护。
问题1:那生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?
答案1:介于生产者和消费者之间的容器可能会被多个执行流(即进程或者线程)同时访问,因此我们需要将该临界资源(即容器)用互斥锁保护起来。其中,所有的生产者和消费者都会竞争式的申请锁,因此生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系。
问题2:那生产者和消费者之间为什么会存在同步关系?
答案2:假如不存在同步关系,即不对生产者和消费者的行为进行控制,那么会有可能出现两种情况。情况1、生产者生产的速度比消费者消费的速度快,那么当生产者生产的数据将容器塞满后,生产者再生产数据(即往容器中插入数据)就会生产失败,因为这里的容器容量是固定的,没有扩容一说。情况2、消费者消费的速度比生产者生产的速度快,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。所以为了让生产数据和消费数据都不会出现失败的情况,我们应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消费,所以生产者和消费者之间才会存在同步关系。
注意在以上理论中,互斥关系保证的是数据的正确性,避免数据因为时序不一致而紊乱;而同步关系是为了让多线程之间协同起来,让生产者线程能成功的完成生产数据、让消费者线程能成功的完成消费数据、避免出现消费和生产失败的情况。
基于阻塞队列BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(即BlockingQueue、注意本质就是STL的queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,某个消费者线程A从队列获取元素的操作将会被阻塞,直到队列中被另一个生产者线程B放入了元素,消费者线程A才能从阻塞状态恢复成运行状态从而继续从队列中获取元素;当队列满时,某个生产者线程A往队列里存放元素的操作也会被阻塞,直到在另一个消费者线程B中有元素被从队列中被取出,生产者线程A才能从阻塞状态恢复成运行状态从而继续往队列中存放元素。
对基于阻塞队列BlockingQueue的生产者消费者模型的模拟实现
为了方便理解,下面我们以单生产者线程、单消费者线程为例进行实现。
这里说一下,首先说明生产者消费者模型的编写思路,然后说明阻塞队列BlockingQueue的编写思路。
生产者消费者模型的编写思路如下:
在主函数中,首先在堆上new创建一个阻塞队列BlockingQueue的对象,然后创建两个线程(一个充当生产者线程、另一个充当消费者线程)。
创建线程时需要一些参数,所需参数1:需要把阻塞队列BlockingQueue的对象传给两个线程作为两个线程的临界资源(说一下阻塞队列BlockingQueue的对象能被两个线程作为临界资源是因为阻塞队列是在堆上开辟的空间,而堆上的数据是所有线程共享的)。所需参数2:一个是生产者线程函数,将该函数传给生产者线程;另一个是消费者线程函数,将该函数传给消费者线程。函数不能凭空而来,所以此时需要创建这两个函数,才能将函数传给生产者和消费者线程。生产者线程函数的逻辑就是无限循环地把一个每次递增1的整数push进阻塞队列、消费者线程函数的逻辑就是无限循环地从阻塞队列中读出一个数据,每读出一个数据,都要把该数据从阻塞队列中删除。
在创建线程完毕后,立刻对生产者和消费者线程进行线程等待(即调用pthread_join函数)从而防止内存泄漏(类似于防止僵尸进程造成的内存泄漏),立刻编写是为了防止后序遗忘了这个步骤。
ConProd.c文件的整体代码
结合上面思路,包含主函数的ConProd.c文件的整体代码如下。
#include"BlockQueue.h"
#include<pthread.h>
#include<unistd.h>//提供sleep函数
//消费者线程函数
void* consumer(void*args)
{
BlockQueue<int> *bq = (BlockQueue<int>*)args;
int a;//输出型参数
//错误写法如下:
//while(bq->size()!=0)
//正确写法如下:为什么呢?因为队列中没有数据时,会被阻塞、等待生产者线程生产货物后继续消费;而不是按照上面错误写法把交易场所中的货物消费完后就退出
while(true)
{
bq->pop(&a);
cout<<"消费了一个数据:"<<a<<endl;
}
return nullptr;
}
//生产者线程函数
void* productor(void*args)
{
BlockQueue<int> *bq = (BlockQueue<int>*)args;
int a=0;
while(true)
{
cout<<"生产一个数据:"<<a<<endl;
bq->push(a);
a++;
}
return nullptr;
}
int main()
{
BlockQueue<int> *bq = new BlockQueue<int>;
pthread_t c,p;//为线程ID
pthread_create(&c,nullptr, consumer, (void*)bq);
pthread_create(&p,nullptr, productor, (void*)bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
阻塞队列BlockingQueue类的编写思路如下:
首先确定BlockingQueue类内的成员变量,如下。
1、_q就不必解释了,作为阻塞队列的底层容器,其一定是需要存在的。说一下阻塞队列是生产者线程和消费者线程的交易场所,所以阻塞队列是临界资源。
2、_capacity,在当前的消费者生产者模型中,是不允许阻塞队列扩容的,阻塞队列满了就需要让生产者线程在_In条件下等待。
3、_mtx也不必解释,阻塞队列作为生产者线程和消费者线程的交易场所,即作为临界资源是一定需要锁保护的,防止阻塞队列中的数据因时序性导致的数据紊乱。
4、剩下的两个条件变量也是一定需要的,它们的作用是控制生产者和消费者的行动顺序(结合上文中的理论,这里换句话说就是维护生产者和消费者的同步关系)。
然后说下BlockingQueue类内的成员函数,如下。
1、(结合下图思考)默认构造需要把BlockingQueue类内的2个成员条件变量_In、_Out和1个锁成员变量_mtx都通过对应的初始化函数初始化,因为根据这些条件变量和锁变量的初始化规则,它们都不是全局的变量,只是局部的变量,而局部的变量就只能通过这些初始化函数初始化。
2、(结合下图思考)析构函数需要把BlockingQueue类内的2个成员条件变量_In、_Out和1个锁成员变量_mtx都通过下图的销毁函数销毁,因为根据这些条件变量和锁变量的销毁规则,因为这些变量只是局部变量,所以在初始化时只能通过上图的3个函数,而通过上图的3个函数初始化的变量就只能通过下图的3个销毁函数销毁。
3、生产者线程函数push和消费者线程函数pop,说一下,这里push和pop严格意义来讲并不能称为生产者线程函数和消费者线程函数,只是因为push函数在生产者线程函数productor中被调用了,所以把push函数也称为了生产者线程函数;pop被称为消费者线程函数的原因同理。push函数就是用于把push函数的参数传入队列queue中,而pop函数就是用于把队列queue的队头front元素取出并拿到,关于push和pop函数剩下的实现思路都在注释中,详情请见下文中的代码。
BlockQueue.h文件的整体代码
结合上面理论,BlockQueue.h的整体代码如下。
#include<iostream>
using namespace std;
#include<queue>
#include<pthread.h>
template<class T>
class BlockQueue
{
public:
BlockQueue(int capacity = 5)
:_capacity(capacity)
{
pthread_cond_init(&_Out,nullptr);
pthread_cond_init(&_In,nullptr);
pthread_mutex_init(&_mtx,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_Out);
pthread_cond_destroy(&_In);
}
//生产者线程函数
void push(const T& x)
{
pthread_mutex_lock(&_mtx);//访问临界区需要加锁
//1、先检测当前的临界资源是否能满足条件,如果阻塞队列满了,表示交易场所中货物已经满了,此时不能让生产者继续生产,需要进入if分支,让生产者在条件变量中等待
//2、pthread_cond_wait函数存在虚假唤醒的情况,这是多线程编程中的一个重要概念。虚假唤醒指的是在等待条件变量时,即使条件变量的条件尚未满足,线程也可能会被唤醒,
//但这并不是因为条件实际上已经满足,而是因为一些其他的线程调用了pthread_cond_signal或pthread_cond_broadcast函数,或者由于一些信号中断了pthread_cond_wait函数的等待,
//既然存在伪唤醒的情况,我们就要想办法杜绝这种情况,为了正确使用pthread_cond_wait,我们需要while循环等待条件变量,而不仅仅是在if语句内等待,以处理虚假唤醒,通过这样的方式,
//就能100%确定临界资源是否满足条件。
//依据上面理论,错误示例如下:
//if(_q.size()==_capacity)
//正确示例如下:
while(_q.size()==_capacity)
{
//pthread_cond_wait:我们竟然是在临界区中wait!此时当前线程函数是持有锁的!如果该线程去等待被挂起了,锁该怎么办呢?毕竟如果不解锁,其他线程函数就没法访问临界区了。
//pthread_cond_wait第二个参数是一个锁,当成功调用wait之后,传入的锁,会被自动释放!
//当我被唤醒时,我从哪里醒来呢??答案:从哪里阻塞,就从哪里被唤醒,所以就是在wait函数内部被唤醒。
//但有一个问题,在wait函数内部被唤醒后,此时还在临界区内,访问临界区需要获取到锁,但此时没有锁,怎么办呢?答案:不必担心,被唤醒后,在wait函数内部剩下的逻辑中会帮我们拿到锁
pthread_cond_wait(&_In,&_mtx);
}
//访问临界资源
_q.push(x);
//1、push是生产者线程函数。对于消费者来说,如果消费者在消费时把交易场所(即队列)中的货物消费完了,此时消费者就会被阻塞在条件变量的等待队列中,需要被人唤醒。
//被谁唤醒呢?只能是生产者,为什么呢?只有交易场所还有货物,即队列中还有有效数据,消费者才能被唤醒继续消费,其他人不知道交易场所是否还有货物,但作为生产者而言,我刚刚才在
//临界区中生产了一个货物,所以我知道交易场所(即队列)中是一定是有货物的(因为即使没有,由于我刚生产了一个,所以也会变成有货物),所以才说只能由生产者去唤醒消费者。
//2、说一下,这个pthread_cond_signal函数可以在临界区内(即可以位于加锁和解锁函数之间),也可以在临界区外,没有区别。比如在临界区内时(注意如果在内,则必须位于访问临界资源的代码下面),此时生产者线程还没有释放锁,所以即使消费者
//被唤醒,但因为消费者没有锁,所以还是会在消费者线程函数中的pthread_cond_wait函数处卡住,这是因为在pthread_cond_wait函数内部有加锁函数,加锁函数在等待锁,所以就卡住了,
//所以消费者无法在没有锁的状态下访问临界区,所以不必担心误访问;再比如在临界区外时,在执行到pthread_cond_signal函数但还没进入函数内时,此时生产者线程已经释放了锁,但因为此时消费者还没有被唤醒,所以
//也抢占不到锁,在调用pthread_cond_signal函数把消费者唤醒后,消费者线程还要在pthread_cond_wait函数内部等待锁的逻辑处卡住,等拿到锁后,才能访问临界区,所以消费者也无法在没有锁的状态下访问临界区,所以不必担心误访问。
//综上所述,因为两种情况导致的结果是相同的,所以才说pthread_cond_signal函数可以在临界区内,也可以在临界区外,没有区别。
//3、说一下,如果消费者在消费时没有把交易场所(即队列)中的货物消费完、没有被阻塞在条件变量的等待队列中、不需要被人唤醒,那在生产者线程函数中(即在当前注释所在的函数中)调用pthread_cond_signal函数唤醒消费者时,消费者会丢弃掉这个唤醒信息,
//所以即使消费者不需要被唤醒,这里调用pthread_cond_signal函数唤醒消费者线程也不会出现什么问题。
pthread_cond_signal(&_Out);
pthread_mutex_unlock(&_mtx);//退出临界区需要解锁
}
//消费者线程函数
void pop(T* x)//x是输出型参数,让调用pop的人拿到数据
{
pthread_mutex_lock(&_mtx);//访问临界区需要加锁
//1、先检测当前的临界资源是否能满足条件,如果阻塞队列为空,表示交易场所中已经没有货物了,此时不能让消费者继续消费,需要进入if分支,让消费者在条件变量中等待
//2、pthread_cond_wait函数存在虚假唤醒的情况,这是多线程编程中的一个重要概念。虚假唤醒指的是在等待条件变量时,即使条件变量的条件尚未满足,线程也可能会被唤醒,
//但这并不是因为条件实际上已经满足,而是因为一些其他的线程调用了pthread_cond_signal或pthread_cond_broadcast函数,或者由于一些信号中断了pthread_cond_wait函数的等待,
//既然存在伪唤醒的情况,我们就要想办法杜绝这种情况,为了正确使用pthread_cond_wait,我们需要while循环等待条件变量,而不仅仅是在if语句内等待,以处理虚假唤醒,通过这样的方式,
//就能100%确定临界资源是否满足条件。
//依据上面理论,错误示例如下:
//if(_q.size()==0)
//正确示例如下:
while(_q.size()==0)
{
//pthread_cond_wait:我们竟然是在临界区中wait!此时当前线程函数是持有锁的!如果该线程去等待被挂起了,锁该怎么办呢?毕竟如果不解锁,其他线程函数就没法访问临界区了。
//pthread_cond_wait第二个参数是一个锁,当成功调用wait之后,传入的锁,会被自动释放!
//当我被唤醒时,我从哪里醒来呢??答案:从哪里阻塞,就从哪里被唤醒,所以就是在wait函数内部被唤醒。
//但有一个问题,在wait函数内部被唤醒后,此时还在临界区内,访问临界区需要获取到锁,但此时没有锁,怎么办呢?答案:不必担心,被唤醒后,在wait函数内部剩下的逻辑中会帮我们拿到锁
pthread_cond_wait(&_Out,&_mtx);
}
//访问临界资源
*x=_q.front();
_q.pop();
pthread_mutex_unlock(&_mtx);//退出临界区需要解锁
//1、pop是消费者线程函数。对于生产者来说,如果生产者在生产时把交易场所(即队列)产满了,此时生产者就会被阻塞在条件变量的等待队列中,需要被人唤醒。
//被谁唤醒呢?只能是消费者,为什么呢?只有交易场所没有被产满,即队列中还有剩余空间,生产者才能被唤醒继续生产,其他人不知道交易场所是否满了,但作为消费者而言,我刚刚才在
//临界区中消费了一次,所以我知道交易场所(即队列)中是一定没有被产满的(因为即使满了,由于我刚消费了一个,所以也会变成不满),所以才说只能由消费者去唤醒生产者。
//2、说一下,这个pthread_cond_signal函数可以在临界区内(即可以位于加锁和解锁函数之间),也可以在临界区外,没有区别。比如在临界区内时(注意如果在内,则必须位于访问临界资源的代码下面),此时消费者线程还没有释放锁,所以即使生产者
//被唤醒,但因为生产者没有锁,所以还是会在生产者线程函数中的pthread_cond_wait函数处卡住,这是因为在pthread_cond_wait函数内部有加锁函数,加锁函数在等待锁,所以就卡住了,
//所以生产者无法在没有锁的状态下访问临界区,所以不必担心误访问;再比如在临界区外时,在执行到pthread_cond_signal函数但还没进入函数内时,此时消费者线程已经释放了锁,但因为此时生产者还没有被唤醒,所以
//也抢占不到锁,在调用pthread_cond_signal函数把生产者唤醒后,生产者线程还要在pthread_cond_wait函数内部等待锁的逻辑处卡住,等拿到锁后,才能访问临界区,所以生产者也无法在没有锁的状态下访问临界区,所以不必担心误访问。
//综上所述,因为两种情况导致的结果是相同的,所以才说pthread_cond_signal函数可以在临界区内,也可以在临界区外,没有区别。
//3、说一下,如果生产者在生产时没有把交易场所(即队列)产满、没有被阻塞在条件变量的等待队列中、不需要被人唤醒,那在消费者线程函数中(即在当前注释所在的函数中)调用pthread_cond_signal函数唤醒生产者时,生产者会丢弃掉这个唤醒信息,
//所以即使生产者不需要被唤醒,这里调用pthread_cond_signal函数唤醒生产者线程也不会出现什么问题。
pthread_cond_signal(&_In);
}
private:
queue<T> _q; //阻塞队列,代表交易场所
int _capacity; //阻塞队列的容量上限,避免queue扩容
pthread_mutex_t _mtx; //通过互斥锁保证队列安全
pthread_cond_t _In; //条件变量,用它表示阻塞队列还有空间剩余,即还可以继续填放货物
pthread_cond_t _Out; //条件变量,用它表示阻塞队列中还存在有效数据,即还可以继续消费货物
};
对上面代码的补充说明:
1、由于我们实现的是单生产者、单消费者的生产者消费者模型,因此我们不需要维护生产者和生产者之间的关系,也不需要维护消费者和消费者之间的关系,我们只需要维护生产者和消费者之间的同步与互斥关系即可。
2、将BlockingQueue当中存储的数据模板化,方便以后需要时进行复用。
3、这里设置BlockingQueue存储数据的上限为5、即_capacity为5,当阻塞队列中存储了五个数据时生产者就不能进行生产了,此时生产者就应该被阻塞。
对【基于阻塞队列BlockingQueue的生产者消费者模型的模拟实现】的测试
将上面模拟实现部分的ConProd.c文件编译并运行后,得到的结果如下图。说一下,生产和消费数据是从整形1开始的,下图只是运行结果的一小个片段,从8342开始只是因为CPU运行的太快了,一下就刷到了8千多。在CPU分给生产者线程和消费者线程的时间片中,在这个时间片内已经足够让生产者把阻塞队列、即交易中心产满数据;阻塞队列被产满后,分给消费者的时间片也足够让消费者线程把阻塞队列、即交易中心中已经产满的数据全部消费完,所以生产和消费的顺序如下图,是连续生产5个后,再连续消费5个。
如果我想步调一致,即生产一个就立马消费一个,可以通过sleep函数让生产者生产的速度和消费者消费的速度都慢下来但速度相同(注意双方sleep的时间一定是相等的)。只需要在上文的ConProd.c的代码的基础上(下图1就是ConProd.c的代码),加上如下图1的两个红框处的代码,这样即可完成步调一致,让生产者每秒生产1个数据、消费者每秒消费1个数据,运行结果如下图2。
图1如下。
图2如下,下图只是运行结果中的一小段片段。
如果我不想步调一致,比如想让生产者生产的速度比消费者消费的速度要快,只需要在上文的ConProd.c的代码的基础上(下图1就是ConProd.c的代码),加上如下图1的红框处的代码即可完成让生产者生产的速度比消费者消费的速度要快,运行结果如下图2;再比如想让生产者生产的速度比消费者消费的速度要慢,只需要在上文的ConProd.c的代码的基础上(下图3就是ConProd.c的代码),加上如下图3的红框处的代码即可完成让生产者生产的速度比消费者消费的速度要慢,运行结果如下图4。
图1如下。
图2如下,下图只是运行结果中的一小段片段。可以看到,因为生产者线程没有sleep,所以一下子就把阻塞队列(即交易场所)给产满了,后序消费者慢悠悠的消费数据,每秒只消费1个,然后消费完毕后生产者又立马重新把阻塞队列产满,后序轮询【消费者每秒消费1个数据后、生产者又立马把阻塞队列产满】这样的操作。
图3如下。
图4如下,下图只是运行结果中的一小段片段。可以看到,因为消费者线程没有sleep,而生产者线程sleep了、在慢悠悠的每秒生产1个数据,所以生产者线程每生产1个数据,消费者线程立马就能把这1个数据给消费完。
如果我们想满足某一条件时再唤醒对应的生产者线程或消费者线程,比如可以当阻塞队列当中存储的数据大于队列容量的一半时,再唤醒消费者线程进行消费,这只需要在上文的BlockQueue.h的代码的基础上(下图1就是BlockQueue.h的代码),加上如下图1的红框处的代码即可,运行结果如下图2。
图1如下。
图2如下,下图只是运行结果中的一小段片段。因为阻塞队列的容量是5,所以按理来说只有生产者生产3个数据后,消费者才能开始消费数据,可以看到下图结果是符合预期的。说一下,这里我们是通过sleep限制了生产者线程函数生产的速度了的,但没有让消费者线程去sleep,如果不对生产者线程函数生成的速度进行限制,则看到的结果就是程序在刚开始运行时,生产者线程就能连续生产5个数据,把阻塞队列产满,然后消费者线程又能立刻连续消费5个数据,把阻塞队列中的数据清空,然后又连续生产5、然后连续消费5、往后循环这个现象。
对基于计算任务的生产者消费者模型的模拟实现
为了方便理解,下面我们以单生产者线程、单消费者线程为例进行实现。
【基于计算任务的生产者消费者模型】说简单点就是在上文中讲解过的【基于阻塞队列BlockingQueue的生产者消费者模型】的基础上,把生产和消费的数据从整形数字变成了函数,因此【基于计算任务的生产者消费者模型】的模拟实现只需要在【基于阻塞队列BlockingQueue的生产者消费者模型】的基础上做出一点修改即可。
哪些修改呢?
Task.h的整体代码
修改1:首先创建一个Task.h文件,在里面实现一个Task类,Task.h的整体代码如下。
#pragma once
#include<iostream>
using namespace std;
#include<functional>
typedef function<int(int,int)> func_t;//C++11的包装器
class Task
{
public:
Task()
{}
~Task()
{}
Task(int x,int y,func_t f):_x(x), _y(y), _f(f)
{}
int operator()()
{
return _f(_x,_y);
}
int _x;
int _y;
func_t _f;
};
修改2:然后在ConProd.c文件中#include"Task.h",并把BlockQueue的类型模板参数从int类变成Task类,然后还要把生产者线程函数productor和消费者线程函数consumer的代码做修改。
修改生产者线程函数的思路为:通过srand和rand函数生成两个随机数x和y,然后创建一个myadd函数,然后把这3个变量的值传给Task变量t,让t调用默认构造完成初始化,然后把Task变量t插入push进阻塞队列中。
修改消费者线程函数的思路为:创建一个Task变量t,然后让t作为输出型参数,把t作为实参传给阻塞队列的pop函数的形参,pop函数结束后,Task变量t就被完成赋值了,也就拿到了阻塞队列中的数据(即Task),然后以【cout<<"consumer:"<<t._x<<'+'<<t._y<<'='<<t()<<endl】的格式打印这个数据,t()是调用了Task类的成员函数operator()。
ConProd.c文件的整体代码
按照上面的思路将ConProd.c文件中的生产者线程函数和消费者线程函数的代码修改后,ConProd.c文件的整体代码如下。
#include"BlockQueue.h"
#include<pthread.h>
#include"Task.h"
#include<unistd.h>//提供sleep函数
#include<time.h>//提供srand、rand函数
int myadd(int x,int y)
{
return x+y;
}
//消费者线程函数
void* consumer(void*args)
{
BlockQueue<Task> *bq = (BlockQueue<Task>*)args;
//错误写法如下:
//while(bq->size()!=0)
//正确写法如下:为什么呢?因为队列中没有数据时,会被阻塞、等待生产者线程生产货物后继续消费;而不是按照上面错误写法把交易场所中的货物消费完后就退出
while(true)
{
Task t;
bq->pop(&t);
cout<<"consumer:"<<t._x<<'+'<<t._y<<'='<<t()<<endl;
}
return nullptr;
}
//生产者线程函数
void* productor(void*args)
{
srand((uint16_t)time(nullptr));
BlockQueue<Task> *bq = (BlockQueue<Task>*)args;
while(true)
{
int x=rand()%1000;
int y=rand()%1000;
cout<<"productor:"<<x<<'+'<<y<<'='<<"?"<<endl;
Task t(x, y, myadd);
bq->push(t);
}
return nullptr;
}
int main()
{
BlockQueue<Task> *bq = new BlockQueue<Task>;
pthread_t c,p;//为线程ID
pthread_create(&c,nullptr, consumer, (void*)bq);
pthread_create(&p,nullptr, productor, (void*)bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
3、注意BlockQueue.h文件的代码是不需要经过任何修改的,直接把上文中讲解【对基于阻塞队列BlockingQueue的生产者消费者模型的模拟实现】部分的BlockQueue.h文件拿来用即可。
对【基于计算任务的生产者消费者模型的模拟实现】的测试
将上面模拟实现部分的ConProd.c文件编译并运行后,得到的结果如下图。说一下,下图只是运行结果的一小个片段。在CPU分给生产者线程和消费者线程的时间片中,在这个时间片内已经足够让生产者把阻塞队列、即交易中心产满数据;阻塞队列被产满后,分给消费者的时间片也足够让消费者线程把阻塞队列、即交易中心中已经产满的数据全部消费完,所以生产和消费的顺序如下图,是连续生产5个后,再连续消费5个。
如果我想步调一致,即生产一个就立马消费一个,可以通过sleep函数让生产者生产的速度和消费者消费的速度都慢下来但速度相同(注意双方sleep的时间一定是相等的)。只需要在上文的ConProd.c的代码的基础上(下图1就是ConProd.c的代码),加上如下图1的两个红框处的代码,这样即可完成步调一致,让生产者每秒生产1个数据、消费者每秒消费1个数据,运行结果如下图2。
图1如下。
图2如下,下图只是运行结果中的一小段片段。
如果我不想步调一致,比如想让生产者生产的速度比消费者消费的速度要快,只需要在上文的ConProd.c的代码的基础上(下图1就是ConProd.c的代码),加上如下图1的红框处的代码即可完成让生产者生产的速度比消费者消费的速度要快,运行结果如下图2;再比如想让生产者生产的速度比消费者消费的速度要慢,只需要在上文的ConProd.c的代码的基础上(下图3就是ConProd.c的代码),加上如下图3的红框处的代码即可完成让生产者生产的速度比消费者消费的速度要慢,运行结果如下图4。
图1如下。
图2如下,下图只是运行结果中的一小段片段。可以看到,因为生产者线程没有sleep,所以一下子就把阻塞队列(即交易场所)给产满了,后序消费者慢悠悠的消费数据,每秒只消费1个,然后消费完毕后生产者又立马重新把阻塞队列产满,后序轮询【消费者每秒消费1个数据后、生产者又立马把阻塞队列产满】这样的操作。
图3如下。
图4如下,下图只是运行结果中的一小段片段。可以看到,因为消费者线程没有sleep,而生产者线程sleep了、在慢悠悠的每秒生产1个数据,所以生产者线程每生产1个数据,消费者线程立马就能把这1个数据给消费完。
多生产者多消费者模型的模拟实现(以及对多生产者和多消费者模型的感悟)
在上文中,不管是【对基于阻塞队列BlockingQueue的生产者消费者模型的模拟实现】还是【对基于计算任务的生产者消费者模型的模拟实现】,我们说过为了方便理解,之前模拟实现它们时都是只创建一个生产者线程和只创建一个消费者线程,也就是单生产者单消费者模型。但实际上不管是【对基于阻塞队列BlockingQueue的生产者消费者模型的模拟实现】还是【对基于计算任务的生产者消费者模型的模拟实现】,一般来说都应该是多生产者多消费者模型,即应该同时创建多个生产者线程和创建多个消费者线程,以上文中的【对基于计算任务的生产者消费者模型的模拟实现】为例进行修改,将它从单生产者单消费者模型改成多生产者多消费者模型。
哪些地方需要修改呢?
这里先插入一点和【哪些地方需要修改呢?】的内容无关的内容,然后再说明【哪些地方需要修改呢?】的内容。
对多生产者和多消费者模型的感悟:先说一下,这里咱们以知道了答案的视角下可以发现是没几个地方需要修改的,对比修改前,修改后的代码也就是多调用了几次创建线程的函数以此多创建几个线程,然后多调用了几次等待线程的函数以此回收线程。为什么单生产单消费模型改成多生产多消费模型会这么容易呢?或者说为什么从A:【需要维护生产者和消费者的互斥关系、同步关系】变成B:【在A的基础上,现在又加上了需要维护生产者和生产者的互斥关系、消费者和消费者的互斥关系】会这么容易呢?
其原因是因为对于生产者和生产者之间、对于消费者和消费者之间、它们也受到互斥锁的限制、它们都是需要竞争锁才能进入临界区也就是阻塞队列完成各自任务的,在多生产者和多消费者模型下的所有的线程,不管是生产者还是消费者,每次只能有一个线程进入临界区,所以只靠互斥锁就能很好的维护生产者和生产者的互斥关系以及消费者和消费者的互斥关系。在单生产和单消费模型中,我们的互斥锁其实就已经具备这些功能了,即已经能很好的维护消费者和消费者的互斥关系、生产者和生产者的互斥关系了,只是说我们在那时因为没有多个生产者、也没有多个消费者,所以没有这种需求,所以并不是说之前不可以维护【生产者和生产者的互斥关系、消费者和消费者的互斥关系】,而只是不需要维护【生产者和生产者的互斥关系、消费者和消费者的互斥关系】,如果我想做,我是能做的,既然我本来就能做,所以从A:【需要维护生产者和消费者的互斥关系、同步关系】变成B:【在A的基础上,现在又加上了需要维护生产者和生产者的互斥关系、消费者和消费者的互斥关系】才会这么容易。
在上面插入了一些与正题无关的内容,现在回到正题:哪些地方需要修改呢?
1、如下图1,需要将上文中【对基于计算任务的生产者消费者模型的模拟实现】部分中的ConProd.c文件中的main函数从左边的样子修改成右边的样子。然后如下图2,需要将consumer和productor函数从左边的样子修改成右边的样子(也就是把靠左的红框处的代码修改成靠右的红框处的代码)。
图1如下。
图2如下。
2、 注意Task.h文件的代码是不需要经过任何修改的,直接把上文中讲解【对基于计算任务的生产者消费者模型的模拟实现】部分的Task.h文件拿来用即可。
3、注意BlockQueue.h文件的代码是不需要经过任何修改的,直接把上文中讲解【对基于计算任务的生产者消费者模型的模拟实现】部分的BlockQueue.h文件拿来用即可。
ConProd.c文件的整体代码
结合上面的思路进行修改后,ConProd.c文件的整体代码如下。
#include"BlockQueue.h"
#include<pthread.h>
#include"Task.h"
#include<unistd.h>//提供sleep函数
#include<time.h>//提供srand、rand函数
int myadd(int x,int y)
{
return x+y;
}
//消费者线程函数
void* consumer(void*args)
{
BlockQueue<Task> *bq = (BlockQueue<Task>*)args;
//错误写法如下:
//while(bq->size()!=0)
//正确写法如下:为什么呢?因为队列中没有数据时,会被阻塞、等待生产者线程生产货物后继续消费;而不是按照上面错误写法把交易场所中的货物消费完后就退出
while(true)
{
Task t;
bq->pop(&t);
cout<<pthread_self()<<"consumer:"<<t._x<<'+'<<t._y<<'='<<t()<<endl;
}
return nullptr;
}
//生产者线程函数
void* productor(void*args)
{
srand((uint16_t)time(nullptr));
BlockQueue<Task> *bq = (BlockQueue<Task>*)args;
while(true)
{
int x=rand()%1000;
int y=rand()%1000;
cout<<pthread_self()<<"productor:"<<x<<'+'<<y<<'='<<"?"<<endl;
Task t(x, y, myadd);
bq->push(t);
}
return nullptr;
}
int main()
{
BlockQueue<Task> *bq = new BlockQueue<Task>;
pthread_t c[2],p[2];//为线程ID
pthread_create(c,nullptr, consumer, (void*)bq);
pthread_create(c+1,nullptr, consumer, (void*)bq);
pthread_create(p,nullptr, productor, (void*)bq);
pthread_create(p+1,nullptr, productor, (void*)bq);
pthread_join(c[0],nullptr);
pthread_join(c[1],nullptr);
pthread_join(p[0],nullptr);
pthread_join(p[1],nullptr);
return 0;
}
对【多生产者多消费者模型的模拟实现】的测试
将上面模拟实现部分的ConProd.c文件编译并运行后,得到的结果如下图。说一下,下图只是运行结果的一小个片段。在CPU分给生产者线程和消费者线程的时间片中,在这个时间片内已经足够让生产者把阻塞队列、即交易中心产满数据;阻塞队列被产满后,分给消费者的时间片也足够让消费者线程把阻塞队列、即交易中心中已经产满的数据全部消费完,所以生产和消费的顺序如下图,是连续生产5个后,再连续消费5个。
而且可以看到,是有不同的线程在生产的(每条语句开头部分的长串数字就是各自线程通过pthread_self()函数打印出的自己的线程ID),也是有不同的线程的消费的,这就是多生产者多消费者模型了。
说一下、这里对生产者消费者步调一致和非步调一致的测试就不测了,类似的内容在上文中已经有过两次测试了,详情见上文对【基于阻塞队列BlockingQueue的生产者消费者模型的模拟实现】的测试部分和对【基于计算任务的生产者消费者模型的模拟实现】的测试部分。
【单生产者单消费者模型】和【多生产者多消费者模型】的应用场景或者说意义(包括如何选择模型比较合适)
问题:在进程中有多个生产线程并且还有多个消费线程时,因为有互斥锁的存在,所以不管一个线程是消费者线程还是生产者线程,只要是一个线程想要访问临界区(即阻塞队列或者说交易场所),就都得持有锁,而当一个线程持有锁后,其他所有线程都无法持有该锁,也就是说在同一时间只能有一个线程访问临界区,那这样貌似和单生产单消费的模型没有任何区别,那多生产多消费的意义在哪呢?请举例说明。
答案如下:不要肤浅地认为把任务Task或者说数据在临界区(可以把它形象的称为交易场所)存放或者取走,或者说不要肤浅地认为线程访问临界区资源就是在生产和消费。生产任务本身和拿到任务后处理才是最消耗时间的,把生产出的任务Task放到临界区和把临界区的任务拿走,即访问临界区资源反而是最简单的。虽然多生产多消费的场景下和单生产单消费一样,在同一时间也只有一个生产线程可以访问临界区,但若干生产线程在访问临界区前,即若干生产线程生产任务时,是可以有多个生产线程并发的生产各自的任务的,只是任务生产完毕后,将任务送到临界区时,在同一时间只能有一个生产线程可以访问临界区,即把任务Task放进阻塞队列。同理,虽然多生产多消费的场景下和单生产单消费一样,在同一时间也只能有一个消费线程可以访问临界区,即把任务Task从阻塞队列中接取出来,但在访问完临界区后,即若干消费者线程拿到临界区的任务后,是可以有多个消费者线程并发的执行各自的任务的。这才是多生产多消费的价值。
举个例子说明多生产者多消费者模型的意义。在单生产者单消费者模型下,消费者线程从阻塞队列(即交易场所)中接取到一个任务后,需要等待键盘或者网络资源就绪,如果这个资源一直不就绪,那么这个消费者线程就一直卡着,与此同时,生产者线程正有条不紊的持续生产任务并在拿到锁后把任务放进阻塞队列中(此时进程中总共两个线程,即一个生产者线程和一个消费者线程,因为消费者线程在等待键盘或者网络资源,处于阻塞状态,所以此时没有线程和生产者线程抢互斥锁,生产者线程一直能抢占锁成功),等到阻塞队列被放满了任务、再也放不下后,消费者线程还是在等待键盘或者网络资源就绪,还没有把最初的任务处理完,此时生产者线程就呼叫消费者线程,说:“消费者线程啊,你快点来接取任务吧!”,但消费者线程连当前任务都没有处理完,所以更不可能再去接任务,所以这就是单生产者单消费者模型的缺点,此时如果有多个消费者线程,其他消费者线程就能帮忙缓解压力,各自去接取任务后处理任务。所以当有类似的情景,此时就应该使用多生产者多消费者模型。
————end————
理解了上几段中说明的多生产者多消费者模型的意义后,可以发现上文中模拟实现多生产者多消费者模型时,虽然这个模拟实现是正确的,但它对比我们模拟实现的单生产者单消费者模型其实是体现不出来优势的,因为在上文模拟实现的多生产者和多消费者模型中,每个生产者线程和消费者线程处理的任务都太简单了,生产者生产任务的流程就是创建一个Task变量t,通过两个整形值和一个函数初始化 t 后就将t插入push进队列queue中;消费者消费任务的流程就是将一个个Task变量从队列queue从取出,然后调用一下Task变量中的operator()函数,这些操作都不需要什么时间成本,可能几纳秒就执行完毕了,所以不会出现某个生产者或者消费者线程处于忙碌的情况,所以也就不太需要其他线程来帮忙分担压力,只需要一个生产者线程和一个消费者线程就足够流畅地运作了,所以才说上文中模拟实现多生产者多消费者模型时,虽然这个模拟实现是正确的,但它对比我们模拟实现的单生产者单消费者模型其实是体现不出来优势的。
如何选择模型比较合适
从上一段我们也能得出一个启示:要在合适的情景下选择合适的模型,不要无脑地使用多生产者多消费者模型,有时单生产者单消费者模型其实更好用。选择模型的依据为:如果生产者和消费者线程在生产或者消费任务时可能很耗时间(注意不要将【生产或者消费任务】这些动作和【把任务放进阻塞队列或者从阻塞队列中取出】混淆,它们是不一样的),则使用多生产者多消费者模型;如果生产者和消费者线程在生产或者消费任务时所花的时间很短,则使用单生产者单消费者模型。