linux入门---消费者生产者模型模拟实现

news2024/9/28 4:31:34

目录标题

  • 消费者生产者模型的理解
  • 单生产单消费模拟实现
    • blockqueue.cpp准备工作
    • MainCp.cpp的准备工作
    • 构造函数和析构函数的模拟实现
    • push函数的实现
    • pop函数的实现
    • poductor_func函数的实现
    • consumer_func函数的实现
    • 程序的测试
    • 程序改进一
    • 程序的改进二
    • 程序的改进三
  • 多生产多消费模拟实现
  • 生产者消费者模型的意义

消费者生产者模型的理解

在前面的学习我们知道了什么是生产者消费者模型,那么这篇文章我们将一步一步的将这个模型实现,首先我们知道要实现消费者生产者模型本质上就是维护好3 2 1原则,3表示的是3种关系分别为生产者和生产者的关系,消费者和消费者的关系,生产者和消费者之间的关系,2表示的两个角色:生产者和消费者,1表示一个缓冲区这个消费者从这个缓冲区中拿取数据,生产者往这个缓冲区中存放数据,那么我们接下来就要通过阻塞队列(Blocking Queue)来实现消费者生产者消费者当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞),那么接下来我们就先模拟实现一个单生产单消费的模型。

单生产单消费模拟实现

blockqueue.cpp准备工作

首先创建一个类用来描述这个模型,类名为:BlockQueue ,因为缓冲区中会存放各种各样的数据所以该类还得是一个模板类:

//blockqueue.hpp
template<class T>
class BlockQueue 
{
public:
    
private:
};

然后我们就要思考这个类中得包含哪些成员变量,首先类中必须得有队列来作为缓冲区来临时的存放数据,因为目标队列是阻塞队列所以还得创建一个锁对象来保证消费者和生产者之间的阻塞访问,因为我们要防止当某个条件不满足的时候依然会有线程不停的占用锁资源所以我们还得创建两个条件变量一个用来将消费者线程挂起一个用来将生产者线程挂起,最后还得创建一个变量用来表示当前的缓冲区最对能够存储数据,那么这就是类中所有成员变量:

//blockqueue.hpp
template<class T>
class BlockQueue 
{
public:
    
private:
    //需要一个锁变量
    pthread_mutex_t _mutex;
    //需要一个生产者条件变量
    pthread_cond_t _pcond;
    //需要一个消费者条件变量
    pthread_cond_t _ccond;
    //表示当前队列最多存储数据个数
    int _maxcap;
    //queue来存储数据
    queue<T> _q;
};

然后我们就要实现这个函数的构造函数和析构函数,并且还得提供push函数用来往队列中插入数据,还得提供一个pop函数用来将获取队列中的数据并将该队列中的数据删除,因为push函数只需要将数据插入即可,所以该函数的参数类型就是const &类型,因为pop还得获取内容所以他的参数得是一个指针也就是输入型参数,那么这里的代码如下:

//blockqueue.hpp
template<class T>
class BlockQueue 
{
public:
    static const int gmaxcap;
     BlockQueue(int maxcap=gmaxcap)
    {
    }
    void push(const T& in)//插入到容器中的数据
    {     
    }
    void pop(T* out)//输出型参数
    {
    }
    ~BlockQueue()
    {
    }
private:
    //需要一个锁变量
    pthread_mutex_t _mutex;
    //需要一个生产者条件变量
    pthread_cond_t _pcond;
    //需要一个消费者条件变量
    pthread_cond_t _ccond;
    //表示当前队列最多存储数据个数
    int _maxcap;
    //queue来存储数据
    queue<T> _q;
};
template<class T>
const int BlockQueue<T>::gmaxcap=10;

那么这就是blockqueue.cpp的准备工作。

MainCp.cpp的准备工作

这个文件里面就是装的就是main函数,在main函数里面我们首先创建两个pthread_t对象和一个blockqueue对象,然后调用两个pthread_create函数创建两个线程并执行对应的函数,因为新线程要执行对应的函数,所以我们这里还得创建两个对应的函数,因为在执行的函数里面要使用blockqueue对象里面的函数,所以pthread_create函数在传递参数的时候就可以将blockqueue对象的地址传递过去,待线程执行完成之后我们就可以使用pthread_join函数将线程进行回收,那么这里的代码就如下:

#include<iostream>
#include"blockqueue.hpp"
using namespace std;
void *consumer_func(void * args)
{
    return nullptr;
}
void *poductor_func(void* args)
{
    return nullptr;
}
int main()
{
    pthread_t consumer,poductor;
    BlockQueue<int>* bq =new BlockQueue<int>();
    pthread_create(&consumer,nullptr,consumer_func,(void*) bq);
    pthread_create(&poductor,nullptr,poductor_func,(void*) bq);
    pthread_join(consumer,nullptr);
    pthread_join(poductor,nullptr);
    return 0;
}

构造函数和析构函数的模拟实现

对于构造函数就一个参数用来表示队列的大小,那么这个参数可以用gmaxcap作为默认参数将该参数赋值给_maxcap即可,然后在构造函数里面吗将两个条件变量和锁进行初始化即可:

BlockQueue(int maxcap=gmaxcap)
:_maxcap(maxcap)
{
    pthread_mutex_init(&_mutex,nullptr);
    pthread_cond_init(&_ccond,nullptr);
    pthread_cond_init(&_pcond,nullptr);
}

那么析构函数做的事情就完全相反将两个条件变量和锁资源销毁就可以,那么这里的代码如下:

~BlockQueue()
{
    pthread_mutex_destroy(&_mutex);
    pthread_cond_destroy(&_pcond);
    pthread_cond_destroy(&_ccond);
}

push函数的实现

该函数的第一步就是调用pthread_mutex_lock函数申请对象中的锁资源,申请成功了就得用if语句判断一下当前容器是否还有空间可以插入数据,那么这里我们就可以创建一个函数来专门实现这个功能:

bool is_full()
{
    if(_q.size()==_maxcap)
    {
        return true;
    }
    return false;
}

如果当前容器满了就调用pthread_cond_wait函数将当前的线程挂起:

void push(const T& in)//插入到容器中的数据
{
    //先申请锁
    pthread_mutex_lock(&_mutex);
    //申请成功之后就判断一下当前容器中的数据是否是满的
    if(is_full())
    {
        //如果是满的就挂起等待
        pthread_cond_wait(&_pcond,&_mutex);
    }     
}

如果当前的元素没有满的话我们就可以往队列里面插入数据,因为将数据插入之后当前的队列就一定不为空,所以就可以使用pthread_cond_signal函数来唤醒消费者线程线程,然后就可以将锁资源进行释放,那么这里的代码如下:

void push(const T& in)//插入到容器中的数据
{
    //先申请锁
    pthread_mutex_lock(&_mutex);
    //申请成功之后就判断一下当前容器中的数据是否是满的
    if(is_full())
    {
        //如果是满的就挂起等待
        pthread_cond_wait(&_pcond,&_mutex);
    }
    _q.push(in);
    //如果走到了这里就说明当前的存储空间一定是有数据的
    //这个时候就得将将消费者线程唤醒
    pthread_cond_signal(&_ccond);
    pthread_mutex_unlock(&_mutex);        
}

pop函数的实现

pop函数也是同样的道理首先也是申请锁资源,然后判断当前的队列是否有资源,那么这里我们也可以创建一个函数来实现这个功能:

bool is_empty()
{
    if(_q.size()==0)
    {
        return true;
    }
    return false;
}

如果当前的容器一个资源都没有的话就可以将当前的线程挂起

void pop(T* out)//输出型参数
{
    pthread_mutex_lock(&_mutex);
    if(is_empty())
    {
        pthread_cond_wait(&_ccond,&_mutex);
    }
}

线程被唤醒了就表明当前的队列中存在数据,那么这个时候就可以将out执行的内容赋值为队列的头部元素,然后将队列的头部元素删除,因为删除元素之后队列中就存在空缺的资源所以这个时候就可以将生产者的线程唤醒,然后释放锁的资源,那么这里的代码如下:

void pop(T* out)//输出型参数
{
    pthread_mutex_lock(&_mutex);
    if(is_empty())
    {
        pthread_cond_wait(&_ccond,&_mutex);
    }
    *out=_q.front();
    _q.pop();
    pthread_cond_signal(&_pcond);
    pthread_mutex_unlock(&_mutex);
}

poductor_func函数的实现

因为我们一开始实现的比较简单,队列中存放的是整形数据,所以我们这里就可以使用srand函数和rand函数生成随机数据,在这个函数的最开始我们干的第一件事就是将参数的类型进行转换将其转化为blockqueue指针的类型,然后我们就可以使用rand函数生产随机数据并调用blockqueue对象中的push函数插入到队列里面然后打印一段话用来作为标记,因为要插入的数据并不止一个所以可以将上面的内容放到一个循环里面来不停的执行最后返回nullptr即可,那么这里的代码如下:

void *poductor_func(void* args)
{
    BlockQueue<int>*bq=static_cast<BlockQueue<int>*>(args);
    while(true)
    {
        int data=rand()%10+1;
        bq->push(data);
        cout<<"生产了一个数据:"<<data<<endl;
    }
    return nullptr;
}

consumer_func函数的实现

同样的道理这里一开始也是将参数的类型进行强转,然后创建一个循环在循环体里面创建一个变量用来接收数据,然后调用blockqueue对象中的pop函数并将变量的地址传递进去,然后打印一句话用来作为标识值,最后返回nullptr来结束函数,那么这里的代码如下:

void *consumer_func(void * args)
{
    BlockQueue<int>*bq=static_cast<BlockQueue<int>*>(args);
    while(true)
    {
        int data;
        bq->pop(&data);
        cout<<"消费了一个数据: "<<data<<endl;
    }
    return nullptr;
}

程序的测试

程序完整的代码如下:

//MainCp.hpp
#include<iostream>
#include<unistd.h>
#include"blockqueue.hpp"
using namespace std;
void *consumer_func(void * args)
{
    BlockQueue<int>*bq=static_cast<BlockQueue<int>*>(args);
    while(true)
    {
        int data;
        bq->pop(&data);
        cout<<"消费了一个数据: "<<data<<endl;
    }
    return nullptr;
}
void *poductor_func(void* args)
{
    BlockQueue<int>*bq=static_cast<BlockQueue<int>*>(args);
    while(true)
    {
        int data=rand()%10+1;
        bq->push(data);
        cout<<"生产了一个数据:"<<data<<endl;
    }
    return nullptr;
}
int main()
{
    srand((unsigned long)time(nullptr));
    pthread_t consumer,poductor;
    BlockQueue<int>* bq =new BlockQueue<int>();
    pthread_create(&consumer,nullptr,consumer_func,(void*) bq);
    pthread_create(&poductor,nullptr,poductor_func,(void*) bq);
    pthread_join(consumer,nullptr);
    pthread_join(poductor,nullptr);
    return 0;
}
//blockqueue.hpp
#include<iostream>
#include<queue>
#include<pthread.h>
using namespace std;
template<class T>
class BlockQueue 
{
    static const int gmaxcap;
public:
    BlockQueue(int maxcap=gmaxcap)
    :_maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_ccond,nullptr);
        pthread_cond_init(&_pcond,nullptr);
    }
    void push(const T& in)//插入到容器中的数据
    {
        //先申请锁
        pthread_mutex_lock(&_mutex);
        //申请成功之后就判断一下当前容器中的数据是否是满的
        if(is_full())
        {
            //如果是满的就挂起等待
            pthread_cond_wait(&_pcond,&_mutex);
        }
        _q.push(in);
        //如果走到了这里就说明当前的存储空间一定是有数据的
        //这个时候就得将将消费者线程唤醒
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);        
    }
    void pop(T* out)//输出型参数
    {
        pthread_mutex_lock(&_mutex);
        if(is_empty())
        {
            pthread_cond_wait(&_ccond,&_mutex);
        }
        *out=_q.front();
        _q.pop();
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    bool is_full()
    {
        if(_q.size()==_maxcap)
        {
            return true;
        }
        return false;
    }
    bool is_empty()
    {
        if(_q.size()==0)
        {
            return true;
        }
        return false;
    }
    //需要一个锁变量
    pthread_mutex_t _mutex;
    //需要一个生产者条件变量
    pthread_cond_t _pcond;
    //需要一个消费者条件变量
    pthread_cond_t _ccond;
    //表示当前队列最多存储数据个数
    int _maxcap;
    //queue来存储数据
    queue<T> _q;
};
template<class T>
const int BlockQueue<T>::gmaxcap=10;

如果大家就拿上面的代码进行测试的话会发现这里的数据打印的非常的快而且非常的乱,所以我们就可以在生产者线程的循环里面添加一个sleep语句让其一秒钟生产插入一个数据,这样消费者线程就总是处于挂起的状态,这样我们就可以看到生产者生产一个数据消费者就消费一个数据的现象,那么这里的运行的现象如下:

在这里插入图片描述

程序改进一

上面的代码在判断容器是否为空或者满的地方可能会出现伪唤醒的问题,上面在唤醒线程的时候采用的是pthread_cond_signal函数,这个函数一次只会唤醒一个线程,但是当前等待的线程可能会存在多个,而且if语句只会判断一次,所以未来一旦将唤醒函数改成pthread_cond_broadcast的话就可能会出现问题,比如说当前的队列只有一个数据,但是等待的消费者线程有10个而且if语句值判断一次并且在等待之前就已经判断了,如果一次性将其全部唤醒的话就会出现错误,所以这里改进的方式就是将if语句的判断改成while语句的判断:

void push(const T& in)//插入到容器中的数据
{
    //先申请锁
    pthread_mutex_lock(&_mutex);
    //申请成功之后就判断一下当前容器中的数据是否是满的
    while(is_full())
    {
        //如果是满的就挂起等待
        pthread_cond_wait(&_pcond,&_mutex);
    }
    _q.push(in);
    //如果走到了这里就说明当前的存储空间一定是有数据的
    //这个时候就得将将消费者线程唤醒
    pthread_cond_signal(&_ccond);
    pthread_mutex_unlock(&_mutex);        
}
void pop(T* out)//输出型参数
{
    pthread_mutex_lock(&_mutex);
    while(is_empty())
    {
        pthread_cond_wait(&_ccond,&_mutex);
    }
    *out=_q.front();
    _q.pop();
    pthread_cond_signal(&_pcond);
    pthread_mutex_unlock(&_mutex);
}

程序的改进二

我们上面测试的代码只是简单的做到了将整形变量放到了队列当中,然后从队列里面再获取整形变量,这样做似乎没有发挥模版的功能,那么这里我们想让该模型实现更多的功能所以这里就再创建一个文件,在文件里面再创建一个类用来存放数据和处理数据有关的函数,有了这个类之后就可以将该类创建出来的对象放到队列里面,然后消费者线程再从这个队列里面获取数据,然后消费者就可以根据类对象中的函数来实现不同的功能,比如说实现两个数据的+ - * / %生产者线程创建两个数据和一个操作符发送到队列里面,然后消费者线程就拿到这些数据并计算其对应的结果,那么这里我们就创建一个名为CalTask的类,那么生产者就将两个数据和一个操作符放到这个类里面,又因为类中得有对应的函数用来对数据进行操作,所以类中还得存在一个function对象,那么这里的代码如下:

class CalTask
{
    using func_t=function<int(int,int,char)>;
public:
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};

那么该类的构造函数就是接收两个数据和一个操作符和一个调用对象对类内变量进行初始化,又因为存在只创建不需要初始化的现象,所以这里还得添加一个无参的默认构造函数,在这个函数里面什么事情都不用做:

CalTask()
{}
CalTask(int x,int y,char op,func_t func)
:_x(x)
,_y(y)
,_op(op)
,_callback(func)
{}

当消费者线程得到该类的对象时是直接使用该类的函数调用重载来执行相应的任务,所以还得添加函数调用重载,这个重载里面就是创建变量然后通过函数计算数据执行的结果,最后创建一个缓冲区记录数据计算的结果最后打印出来,那么这里的代码如下:

string operator()()
{
    int result=_callback(_x,_y,_op);
    char buffer[64];
    snprintf(buffer,sizeof(buffer),"%d %c %d = %d",_x,_op,_y,result);
    return buffer;
}

当消费者线程调用这个重载的时候就会打印这么一句话来作为标记,那么同样的道理当生产者线程创建这个对象的时候也得打印一些内容来作为标记,所以我们还得创建一个函数用来作为生产者线程的标记:

string toTaskString()
{
    char buffer[64];
    snprintf(buffer,sizeof(buffer),"%d %c %d = ?",_x,_op,_y);
    return buffer;
}

这样我们的类就创建完成了,但是在创建这个类对象的时候得创建传递一个函数调用,那么这个函数还得来实现,函数的功能就是计算两个数据的+ - * / %的结构,函数需要三个参数两个用来表示数据一个用来表示计算的操作符:

int mymath(int x,int y,char oper)
{}

那么在函数里面我们就可以创建一个变量result用来计算结果,然后通过oper的值来创建一个switch语句根据不同的值来执行不同的case语句,然后在case语句里面将计算结构赋值给result变量然后将其返回即可,那么这里的代码如下:

int mymath(int x,int y,char oper)
{
    int result=0;
    switch(oper)
    {
        case '+':
            result=x+y;
            break;
        case '-':
            result=x-y;
            break;
        case '*':
            result=x*y;
            break;
        case '/':
            result=x/y;
            break;
        case '%':
            result=x%y;
            break;
        default:
            break;
    }
    return result;
}

但是这里存在一个问题就是不能出现除以0或者%0的表达式所以在%和/的case语句里面就得判断一下如果y的值为0的话就使用打印出现了什么错误,然后将result的值赋值为-1并break结束,这样当结果出现-1的时候我们就知道是正常计算得到的-1还是出现错误得到的-1,那么函数的实现如下:

int mymath(int x,int y,char oper)
{
    int result=0;
    switch(oper)
    {
        case '+':
            result=x+y;
            break;
        case '-':
            result=x-y;
            break;
        case '*':
            result=x*y;
            break;
        case '/':
            if(y==0)
            {
                cerr << "div zero error!" << endl;
                result = -1;
                break;
            }
            result=x/y;
            break;
        case '%':
            if(y==0)
            {
                cerr<<"div zero error!"<<endl;
                result=-1;
                break;
            }
            result=x%y;
            break;
        default:
            break;
    }
    return result;
}

将这个完成之后就可以来看看线程函数该如何实现,首先是生产者线程同样的原理先将参数的类型进行修改:

void *poductor_func(void* args)
{
    BlockQueue<CalTask>*bq=static_cast<BlockQueue<CalTask>*>(args);
    return nullptr;
}

然后依然是创建一个while循环,在循环里面根据rand函数得到x和y的值,那如何来得到随机的操作符呢?方法也很简单我们可以创建一个全局的string对象对象里面存放+ - * / %,然后就可以根据rand函数来随机得到string对象的下标,根据该下标就可以得到对应的操作符,有了数据和操作符之后就可以创建一个TaskCal类对象并将该对象插入到队列并使用对象中的toTaskString函数来打印内容出来作为标记:

void *poductor_func(void* args)
{
    BlockQueue<CalTask>*bq=static_cast<BlockQueue<CalTask>*>(args);
    while(true)
    {
        int x=rand()%10;
        int y=rand()%10;
        int z=rand()%oper.size();
        CalTask T(x,y,oper[z],mymath);
        cout<<"生产任务:"<<T.toTaskString()<<endl;
        bq->push(T);
        sleep(1);
    }
    return nullptr;
}

那么对于的消费者线程就是创建一个名为data的对象并将该对象的地址传递给pop函数,这样就可以调用data的函数操函数调用重载打印出一些内容,那么这里的代码如下:

void *consumer_func(void * args)
{
    BlockQueue<CalTask>*bq=static_cast<BlockQueue<CalTask>*>(args);
    while(true)
    {
        CalTask data;
        bq->pop(&data);
        cout<<"消费任务:"<<data()<<endl;
    }
    return nullptr;
}

那么完整的代码如下:

//task.hpp
#include<iostream>
#include<string>
#include<functional>
#include<cstdio>
using namespace std;
const string oper="+-*/%";
class CalTask
{
    using func_t=function<int(int,int,char)>;
public:
    CalTask()
    {}
    CalTask(int x,int y,char op,func_t func)
    :_x(x)
    ,_y(y)
    ,_op(op)
    ,_callback(func)
    {}
    string operator()()
    {
        int result=_callback(_x,_y,_op);
        char buffer[64];
        snprintf(buffer,sizeof(buffer),"%d %c %d = %d",_x,_op,_y,result);
        return buffer;
    }
    string toTaskString()
    {
        char buffer[64];
        snprintf(buffer,sizeof(buffer),"%d %c %d = ?",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};
int mymath(int x,int y,char oper)
{
    int result=0;
    switch(oper)
    {
        case '+':
            result=x+y;
            break;
        case '-':
            result=x-y;
            break;
        case '*':
            result=x*y;
            break;
        case '/':
            if(y==0)
            {
                cerr << "div zero error!" << endl;
                result = -1;
                break;
            }
            result=x/y;
            break;
        case '%':
            if(y==0)
            {
                cerr<<"div zero error!"<<endl;
                result=-1;
                break;
            }
            result=x%y;
            break;
        default:
            break;
    }
    return result;
}
//MainCp.cc
#include<iostream>
#include<unistd.h>
#include"blockqueue.hpp"
#include"Task.hpp"
using namespace std;
void *consumer_func(void * args)
{
    BlockQueue<CalTask>*bq=static_cast<BlockQueue<CalTask>*>(args);
    while(true)
    {
        CalTask data;
        bq->pop(&data);
        cout<<"消费任务:"<<data()<<endl;
    }
    return nullptr;
}
void *poductor_func(void* args)
{
    BlockQueue<CalTask>*bq=static_cast<BlockQueue<CalTask>*>(args);
    while(true)
    {
        int x=rand()%10;
        int y=rand()%10;
        int z=rand()%oper.size();
        CalTask T(x,y,oper[z],mymath);
        cout<<"生产任务:"<<T.toTaskString()<<endl;
        bq->push(T);
        sleep(1);
    }
    return nullptr;
}
int main()
{
    srand((unsigned long)time(nullptr));
    pthread_t consumer,poductor;
    BlockQueue<CalTask>* bq =new BlockQueue<CalTask>();
    pthread_create(&consumer,nullptr,consumer_func,(void*) bq);
    pthread_create(&poductor,nullptr,poductor_func,(void*) bq);
    pthread_join(consumer,nullptr);
    pthread_join(poductor,nullptr);
    return 0;
}

代码的运行结果如下:
在这里插入图片描述
可以看到这里的运行结构明显就比之前丰富多了,所以未来我们想要实现更多的功能就只用设计对应的类与其对数据的处理函数即可。

程序的改进三

我们可以用下面的图片来描述一下上面写的代码:
在这里插入图片描述
上面的代码就是创建了一个生产者线程和一个消费者线程然后通过阻塞队列的形式将数据进行传递,那么这就只创建一个了阻塞队列,我们接下来改进的方式就是创建多个阻塞队列,也就是将上面的图片变成下面这样:
在这里插入图片描述
我们让其中的一个线程即使生产者也是消费者,所以我们可以将上面的程序修改成这样,第一个线程产生任务将任务放到阻塞队列1中,然后第二个线程从阻塞队列1中获取任务并执行任务将任务执行的结果打印到屏幕上的同时也将执行的结果放到阻塞队列二当中,然后线程3从阻塞队列二中获取结果并将结果保存到一个文件里面,那么这就是程序改进的大致样貌,那么接下来我们就一步一步的执行。之前我们说过未来我们想要实现更多的功能就只用设计对应的类和函数即可,而保存数据和计算数据是两个不同的功能,所以我们这里就创建一个新类用来传递保存数据这个功能所需要的数据和方法,首先类中得有一个string对象来保存数据,还得有一个function对象来存储对应的函数调用,这个函数的返回值是void参数是string的引用其意义就是将string中的内容保存到文件当中:

class SaveTask
{
    using func_t =function<void(string&)>;
public:

private:
    string _message;
    func_t _func;
};

然后就要实现这个函数的构造函数,对于构造函数就需要两个参数一个用来接收string对象一个参数用来接收函数对象将这两个对象赋值给两个类内数据即可,然后还得创建一个什么都不用干的默认构造函数:

SaveTask()
{}
SaveTask(string& message,func_t func)
:_message(message)
,_func(func)
{}

然后还创建一个函数调用的操作符重载,在这个函数里面调用类内的function对象即可:

class SaveTask
{
    using func_t =function<void(string&)>;
public:
    SaveTask()
    {}
    SaveTask(string& message,func_t func)
    :_message(message)
    ,_func(func)
    {}
    void operator()()
    {
        _func(_message);
    }
private:
    string _message;
    func_t _func;
};

有了这个类之后我们就可以将数据放到阻塞队列里面,那么接下来我们就要实现对应的存储方法,首先这个函数跟类中的function对象是保持一致的返回值为void参数是string的引用(这里的顺序应该是先有函数然后再有function对象):

void Save(string& message)
{}

那么函数的第一步就是创建string对象记录要打开文件的相对位置,然后使用fopen函数以追加的方式打开这个文件,因为文件的打开可能会出现失败的情况,所以这里得加个if语句进行判断,如果成功了就调用fputs函数将数据写入到文件里面
在这里插入图片描述
然后就可以使用fclose函数将文件给关掉:

void Save(string& message)
{
    const string target_file="./log.txt";
    FILE* fp=fopen(target_file.c_str(),"a+");
    if(fp==nullptr)
    {
        cerr<<"open error"<<endl;
        return;
    }
    fputs(message.c_str(),fp);
    fputs("\n",fp);//因为futs不会自带/n所以加上这个方便查看
    fclose(fp);
}

有了对应的类对象和函数之后就可以修改对应的线程函数和main函数,首先来修改main函数中的内容因为上图中存在三个线程和两个阻塞队列,并且中间的线程得同时访问两个阻塞队列但是执行线程函数的时候只能传递一个参数,所以这里可以创建一个新的类类中有两个阻塞队列的指针,因为阻塞队列是有模版的并且类中的两个队列存储的数据类型可能是不一样的所以新创建的类也得是模版类并且模版中有两个参数:

template<class C,class S>//c表示计算的类,s表示存储的类
class BlockQueues
{
public:
    BlockQueue<C>* c_bq;
    BlockQueue<S>* s_bq;    
};

有了这个类之后在main函数里面就可以只创建一个这样的对象即可,然后创建三个线程让其执行三个不同的函数:

int main()
{
    srand((unsigned long)time(nullptr));
    pthread_t consumer,poductor,saver;//创建用于保存的线程
    BlockQueues<CalTask,SaveTask>* bqs =new BlockQueues<CalTask,SaveTask>();
    bqs->c_bq=new BlockQueue<CalTask>();
    bqs->s_bq=new BlockQueue<SaveTask>();
    pthread_create(&consumer,nullptr,consumer_func,(void*) bqs);
    pthread_create(&poductor,nullptr,poductor_func,(void*) bqs);
    pthread_create(&saver,nullptr,saver_func,(void*) bqs);
    pthread_join(consumer,nullptr);
    pthread_join(poductor,nullptr);
    pthread_join(saver,nullptr);
    return 0;
}

对于生产者函数唯一的变化就是在参数类型转换的时候,我们先将参数强制转换成为BlockQueues*类型这样就可以合法的获取对象中的数据,因为消费者线程要需要的是装载CalTask的队列,所以就创建一个BlockQueue<CalTask>*的指针对象,让其指向BlockQueues中的c_bq:

void *poductor_func(void* args)
{
    BlockQueue<CalTask>*cal_bq=(static_cast<BlockQueues<CalTask,SaveTask>*>(args))->c_bq;
    while(true)
    {
        int x=rand()%10;
        int y=rand()%10;
        int z=rand()%oper.size();
        CalTask T(x,y,oper[z],mymath);
        cout<<"生产任务:"<<T.toTaskString()<<endl;
        cal_bq->push(T);
        sleep(1);
    }
    return nullptr;
}

对于中间的消费者线程不仅需要CalTask的阻塞队列还需要SaveTask的阻塞队列,所以在consumer_func中得创建两个指针对象并执行两次强制类型转换:

BlockQueue<CalTask>*cal_bq=(static_cast<BlockQueues<CalTask,SaveTask>*>(args))->c_bq;
BlockQueue<SaveTask>*save_bq=(static_cast<BlockQueues<CalTask,SaveTask>*>(args))->s_bq;

然后就是先从cal_bq中获取任务将任务执行的结果打印保存下来打印之后便创建一个SaveTask对象并调用save_bq中的push函数将对象和函数放到save_bq里面即可:

void *consumer_func(void * args)
{
    BlockQueue<CalTask>*cal_bq=(static_cast<BlockQueues<CalTask,SaveTask>*>(args))->c_bq;
    BlockQueue<SaveTask>*save_bq=(static_cast<BlockQueues<CalTask,SaveTask>*>(args))->s_bq;
    while(true)
    {
        CalTask data;
        cal_bq->pop(&data);
        cout<<"完成任务:"<<data()<<endl;
        string result=data();
        SaveTask save_result(result,Save);
        save_bq->push(save_result);
    }
    return nullptr;
}

最后就是saver_func函数也是创建一个while然后创建一个savetask对象,然后调用save_bq中的pop函数将数据放到savetask对象里面即可,然后调用该对象的函数调用重载即可,那么这里的代码如下:

void*saver_func(void*args)
{
    BlockQueue<SaveTask>*save_bq=(static_cast<BlockQueues<CalTask,SaveTask>*>(args))->s_bq;
    while(true)
    {
        SaveTask result;
        save_bq->pop(&result);  
        result();
        cout<<"保存任务"<<endl;
    }
    return nullptr;
}

那么完整的代码如下:

//MainCp.cc
#include<iostream>
#include<unistd.h>
#include"blockqueue.hpp"
#include"Task.hpp"
using namespace std;
template<class C,class S>//c表示计算的类,s表示存储的类
class BlockQueues
{
public:
    BlockQueue<C>* c_bq;
    BlockQueue<S>* s_bq;    
};
void *consumer_func(void * args)
{
    BlockQueue<CalTask>*cal_bq=(static_cast<BlockQueues<CalTask,SaveTask>*>(args))->c_bq;
    BlockQueue<SaveTask>*save_bq=(static_cast<BlockQueues<CalTask,SaveTask>*>(args))->s_bq;
    while(true)
    {
        CalTask data;
        cal_bq->pop(&data);
        cout<<"完成任务:"<<data()<<endl;
        string result=data();
        SaveTask save_result(result,Save);
        save_bq->push(save_result);
    }
    return nullptr;
}
void *poductor_func(void* args)
{
    BlockQueue<CalTask>*cal_bq=(static_cast<BlockQueues<CalTask,SaveTask>*>(args))->c_bq;
    while(true)
    {
        int x=rand()%10;
        int y=rand()%10;
        int z=rand()%oper.size();
        CalTask T(x,y,oper[z],mymath);
        cout<<"生产任务:"<<T.toTaskString()<<endl;
        cal_bq->push(T);
        sleep(1);
    }
    return nullptr;
}
void*saver_func(void*args)
{
    BlockQueue<SaveTask>*save_bq=(static_cast<BlockQueues<CalTask,SaveTask>*>(args))->s_bq;
    while(true)
    {
        SaveTask result;
        save_bq->pop(&result);  
        result();
        cout<<"保存任务"<<endl;
    }
    return nullptr;
}

int main()
{
    srand((unsigned long)time(nullptr));
    pthread_t consumer,poductor,saver;//创建用于保存的线程
    BlockQueues<CalTask,SaveTask>* bqs =new BlockQueues<CalTask,SaveTask>();
    bqs->c_bq=new BlockQueue<CalTask>();
    bqs->s_bq=new BlockQueue<SaveTask>();
    pthread_create(&consumer,nullptr,consumer_func,(void*) bqs);
    pthread_create(&poductor,nullptr,poductor_func,(void*) bqs);
    pthread_create(&saver,nullptr,saver_func,(void*) bqs);
    pthread_join(consumer,nullptr);
    pthread_join(poductor,nullptr);
    pthread_join(saver,nullptr);
    return 0;
}
//Task.hpp
#include<iostream>
#include<string>
#include<functional>
#include<cstdio>
using namespace std;
const string oper="+-*/%";
class CalTask
{
    using func_t=function<int(int,int,char)>;
public:
    CalTask()
    {}
    CalTask(int x,int y,char op,func_t func)
    :_x(x)
    ,_y(y)
    ,_op(op)
    ,_callback(func)
    {}
    string operator()()
    {
        int result=_callback(_x,_y,_op);
        char buffer[64];
        snprintf(buffer,sizeof(buffer),"%d %c %d = %d",_x,_op,_y,result);
        return buffer;
    }
    string toTaskString()
    {
        char buffer[64];
        snprintf(buffer,sizeof(buffer),"%d %c %d = ?",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};
int mymath(int x,int y,char oper)
{
    int result=0;
    switch(oper)
    {
        case '+':
            result=x+y;
            break;
        case '-':
            result=x-y;
            break;
        case '*':
            result=x*y;
            break;
        case '/':
            if(y==0)
            {
                cerr << "div zero error!" << endl;
                result = -1;
                break;
            }
            result=x/y;
            break;
        case '%':
            if(y==0)
            {
                cerr<<"div zero error!"<<endl;
                result=-1;
                break;
            }
            result=x%y;
            break;
        default:
            break;
    }
    return result;
}
class SaveTask
{
    using func_t =function<void(string&)>;
public:
    SaveTask()
    {}
    SaveTask(string& message,func_t func)
    :_message(message)
    ,_func(func)
    {}
    void operator()()
    {
        _func(_message);
    }
private:
    string _message;
    func_t _func;
};

void Save(string& message)
{
    const string target_file="./log.txt";
    FILE* fp=fopen(target_file.c_str(),"a+");
    if(fp==nullptr)
    {
        cerr<<"open error"<<endl;
        return;
    }
    fputs(message.c_str(),fp);
    fputs("\n",fp);//因为futs不会自带/n所以加上这个方便查看
    fclose(fp);
}

代码的运行的结果如下:
在这里插入图片描述
在这里插入图片描述
当前路径下有一个名为log.txt的文件,文件的内容如下:
在这里插入图片描述
那么这就是我们对程序三的另外一个改进。

多生产多消费模拟实现

通过上面的学习我们知道了如何实现单生产单消费模型的实现,那么接下来我们就要模拟实现多生产多消费的模型,首先得对main函数中的代码进行修改,使用数组记录多个线程的id,然后使用循环依让这些线程执行对应的函数,最后使用循环将其进行回收:

int main()
{
    srand((unsigned long)time(nullptr));
    pthread_t consumer[5],poductor[5],saver;//创建用于保存的线程
    BlockQueues<CalTask,SaveTask>* bqs =new BlockQueues<CalTask,SaveTask>();
    bqs->c_bq=new BlockQueue<CalTask>();
    bqs->s_bq=new BlockQueue<SaveTask>();
    for(int i=0;i<5;i++)
    {
        pthread_create(&consumer[i],nullptr,consumer_func,(void*) bqs);
        pthread_create(&poductor[i],nullptr,poductor_func,(void*) bqs);
    }
    pthread_create(&saver,nullptr,saver_func,(void*) bqs);
    for(int i=0;i<5;i++)
    {
        pthread_join(consumer[i],nullptr);
        pthread_join(poductor[i],nullptr);
    }
    pthread_join(saver,nullptr);
    return 0;
}

然后我们试着将其运行一下看看能不能运行成功:
在这里插入图片描述
文件中保存的内容如下:
在这里插入图片描述
仔细的对比一下发现这里无论是计算结果还是任务被处理的顺序都是对的上的,那么这就说明多生产多消费的模型实现的是正确的,而我们之前说这样的模型要保证消费者和消费者之间是互斥的,生产者和生产者之间是互斥的,那这里是如何做到的呢?答案是pop函数和push函数中对同一把锁的竞争,当多个生产者争夺同一锁的时候生产者之间就是互斥的,多个消费者争夺同一把锁的时候消费者之间就是互斥的,而生产者和消费者之间争夺的也是同一把所以生产者和消费者之间也是互斥的,所以上述的代码在实现单生产和单消费的时候就已经达到了多生产和多消费的功能。

生产者消费者模型的意义

在之前的学习中我们总是听到这么一个观点:该模型能够提高效率,但是通过上面的分析我们可以看到因为锁的存在不管当前生产者中存在多少个线程只有一个线程能向队列里面放数据,不管创建了多少个生产者线程也只有一个线程能从队列当中获取数据,并且生产者放置数据和消费者获取数据的操作还是互斥的,那这个高效究竟高效在哪呢?比如说下面的图片:
在这里插入图片描述
那么要想知道这个问题首先我们的目光就不能停留在阻塞队列上面,我们首先来看看生产者将任务放到阻塞队列这件事,这个任务是怎么来的呢?答案是生产者制造的获取到的,那获取任务构造任务的时候需要消耗时间吗?答案是肯定需要的毕竟任务不是凭空而来的,其次我们再看看消费者将任务取出来这件事,消费者获取任务之后难道就什么事情都不做了吗?肯定不是的消费者获取任务之后也得对任务做出一些操作,这些操作肯定是需要消耗时间的,所以该模型的高效并不是存数据拿数据的高效,而是我们可以让多个线程并发处理多个任务的同时其他的线程还可以从队列中拿数据存放数据,也就是下面图片这样:
在这里插入图片描述
也就是在存放任务之前和拿取任务之后,让生产者线程并发执行让消费者并发执行从而达到高效的目的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1181593.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

什么是CCS Concepts

在撰写论文时&#xff0c;看到了CCS Concepts&#xff0c;注意这是对自己论文的分类&#xff0c;不能随便填写。 在ACM的网页"http://dl.acm.org/ccs/ccs.cfm"中选择自己论文的分类&#xff1a; 然后点击左侧的“Assign This CCS Concept”&#xff0c;再选择相关性…

【TDK 电容 】介电质 代码 对应温度及变化率

JB 电解质是什么&#xff1f;没找到&#xff0c;只有TDK有&#xff0c;也只有这个温度的区别&#xff0c;并且已经停产在售。 对比发现是mouser网站关于电容的描述错误。下图显示正确的&#xff0c;再然后是错误的。 在TDK官网&#xff0c;这样的描述 温度特性 分类标准代码温…

制作电子画册的有好帮手---FLBOOK

随着互联网的发展&#xff0c;越来越多的人开始使用电子书来阅读书籍。而将PDF文件转换成在线翻页电子书&#xff0c;则是一种非常方便的方式。今天&#xff0c;给大家推荐一个可以将PDF转在线翻页电子书的网站。 这个网站就是FLBOOK在线制作电子杂志平台&#xff0c;只需要三步…

C++——类和对象(初始化列表、匿名对象、static成员、类的隐式类型转换和explicit关键字、内部类)

初始化列表、匿名对象、static成员、类的隐式类型转换和explicit关键字、内部类 本章思维导图&#xff1a; 注&#xff1a;本章思维导图对应的xmind文件和.png文件都已同步导入至资源 文章目录 初始化列表、匿名对象、static成员、类的隐式类型转换和explicit关键字、内部类1.…

案例-注册页面(css)

html页面用css控制样式&#xff0c;画一个注册页面。 页面最终效果如下&#xff1a; 页面代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>注册页面</title> <style>*{…

文献阅读 - JADE:具有可选外部存档的自适应差分进化

文章目录 标题摘要关键字结论研究背景I. INTRODUCTION 常用基础理论知识II. BASIC OPERATIONS OF DEIII. ADAPTIVE DE ALGORITHMSA. DESAPB. FADEC. SaDED. jDE 研究内容、成果IV. JADEA. DE/Current-to-pbestB. Parameter AdaptationC. Explanations of the Parameter Adaptat…

WSGI与ASGI:两种Python Web服务器网关接口的比较

在当今的Web开发领域&#xff0c;选择合适的服务器网关接口&#xff08;Server Gateway Interface&#xff0c;简称SGI&#xff09;对于提高Web应用程序的性能和并发性至关重要。在Python中&#xff0c;有两种常见的SGI&#xff1a;WSGI和ASGI。本文将深入探讨这两种SGI的异同点…

中国人民大学与加拿大女王大学金融硕士——在职读研,让人生的火花迸发

每个人都像是一块未经雕琢的宝石&#xff0c;隐藏着无尽的光芒。然而&#xff0c;生活、工作中的困难、挫折和压力&#xff0c;就像尘土一样&#xff0c;掩盖了我们的闪亮之处。只有当我们冲破这些阻碍&#xff0c;才能让内在的光芒照亮世界。中国人民大学与加拿大女王大学金融…

Q-Vision+CANpro Max总线解决方案

智能联网技术在国内的发展势头迅猛&#xff0c;随着汽车智能化、网联化发展大潮的到来&#xff0c;智能网联汽车逐步成为汽车发展的主要趋势。越来越多整车厂诉求&#xff0c;希望可以提供本土的测量软件&#xff0c;特别是关于ADAS测试。而风丘科技推出的Q-Vision软件不仅可支…

一键批量剪辑:视频随机分割新玩法,高效剪辑不再难

随着视频内容的日益丰富&#xff0c;人们对于视频剪辑的需求也日益增长。而传统的视频剪辑方式往往需要耗费大量的时间和精力&#xff0c;让许多非专业人士望而却步。然而&#xff0c;现在有一款名为“云炫AI智剪”的软件&#xff0c;它为我们提供了一种全新的视频剪辑方式——…

数据结构:AVL树的旋转(平衡搜索二叉树)

1、AVL树简介 AVL树是最先发明的自平衡二叉查找树。在AVL树中任何节点的两个子树的高度最大差别为1&#xff0c;所以它也被称为高度平衡树。增加和删除可能需要通过一次或多次树旋转来重新平衡这个树。AVL树得名于它的发明者G. M. Adelson-Velsky和E. M. Landis&#xff0c;他们…

uniapp原生插件之安卓串口操作原生插件

插件介绍 安卓串口操作原生插件&#xff0c;支持设置串口&#xff0c;波特率&#xff0c;停止位&#xff0c;数据位&#xff0c;校验位&#xff0c;流控以及延迟&#xff0c;支持粘包处理解决分包问题&#xff0c;支持多串口操作&#xff0c;无需root 插件地址 安卓串口操作…

2023年【危险化学品经营单位安全管理人员】考试资料及危险化学品经营单位安全管理人员考试试卷

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2023年危险化学品经营单位安全管理人员考试资料为正在备考危险化学品经营单位安全管理人员操作证的学员准备的理论考试专题&#xff0c;每个月更新的危险化学品经营单位安全管理人员考试试卷祝您顺利通过危险化学品经…

【深度神经网络(DNN)】实现车牌识别

文章目录 前言一、数据集介绍二、步骤1.导包2.参数配置3.数据处理4.模型定义5.模型训练6.模型预测 总结 前言 课内实践作业 车牌识别 一、数据集介绍 1.车牌识别数据集&#xff1a;VehicleLicense车牌识别数据集包含16151张单字符数据&#xff0c;所有的单字符均为严格切割且…

力扣:67.二进制求和

class Solution { public:string addBinary(string a, string b) {string ans;reverse(a.begin(), a.end()); // 将字符串a反转reverse(b.begin(), b.end()); // 将字符串b反转int n max(a.size(), b.size()), carry 0; // 获取a和b的最大长度&#xff0c;并初始化进位为0for…

Python合并拼接图片

目录 图片二维合并拼接&#xff08;类似九宫格&#xff09;图片纵向合并拼接举例18张图片合并为2张九宫格图片18张图片合并为2张纵向图片 使用前需要安装PIL库&#xff0c;以下代码使用的Pillow(10.1.0) pip install pillow图片二维合并拼接&#xff08;类似九宫格&#xff09…

趣玩行为商城:用智能消费行为开启财富新生活!

随着当前的消费主力转移至90后和Z时代&#xff0c;购物和消费习惯已经发生了翻天覆地的变化。以往那种大超市&#xff0c;线下大卖场、超级Mall综合体逐渐式微&#xff0c;以“健康生活”、“智能消费”“社区直达”为主的消费理念逐渐兴盛&#xff0c;消费者开始更多地关注此类…

sqli-labs-1

文章目录 Less-01Less-02Less-03Less-04 Less-01 1.输入不同的id值&#xff0c;可以获取不同的用户信息&#xff1a; 2.在sql后拼接’or11–&#xff0c;并没有获取到所有用户的信息&#xff0c;猜测可能用了limit语句 3.构造错误的sql语句&#xff0c;果然有limit报错: …

简历考察点1_《基于 VUE2.0 前后端分离的个人博客系统》

项目名称&#xff1a;《基于 Vue2.0①前后端分离的个人博客系统》 项目描述&#xff1a;提供最新技术资讯、开发知识和分享的博客平台&#xff0c;功能模块包括&#xff1a;支持不同用户登录注册、编辑并发布博客、上传图片、评论留言、根据访问量查询最热文章和标签、根据日期…

VM虚拟机逆向---[羊城杯 2021]Babyvm 复现【详解】

文章目录 前言题目分析汇编脚本分析汇编exp 后言 前言 无 题目分析 &unk_804B0C0里面是opcode&#xff0c;sub_1C8里面有个mprotect&#xff0c;用了一个SMC加密。 我使用的是动态调试&#xff0c;因为是ELF文件&#xff0c;链接一下linux&#xff0c;进行动调&#xff…