目录
1.模型前提
2.阻塞队列(消费场所)
3. 实验
4.有关效率
1.模型前提
以单生产者对单消费者为例子:
前提一:有一个缓冲区作为消费场所。
前提二:有两种功能不同的线程分别具有消费与生产的能力。
前提三:生产者与生产者之间有互斥的关系,消费者与消费者之间有互斥的关系,生产者与消费者之间同时具有互斥与同步的关系。
2.阻塞队列(消费场所)
阻塞队列与普通队列最大的不同是:当队列满时生产者不能继续生产,当队列空时消费者不能继续消费。
3. 实验
来展示一份多线程工作遵循生产者消费者模型的代码。
部分1:设置消费的模板(task.hpp)
#pragma once
#include <iostream>
#include <functional>
typedef std::function<int(int, int)> func_t;//c++11特性打包器
class Task
{
public:
Task(){}
Task(int x, int y, func_t func):x_(x), y_(y), func_(func)
{}
int operator ()() //仿函数
{
return func_(x_, y_);
}
public:
int x_;
int y_;
func_t func_;
};
部分2:采用raii的方式设置锁的处置方式(lockGuard.hpp)
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t *mtx):pmtx_(mtx)
{}
void lock()
{
std::cout << "要进行加锁" << std::endl;
pthread_mutex_lock(pmtx_);
}
void unlock()
{
std::cout << "要进行解锁" << std::endl;
pthread_mutex_unlock(pmtx_);
}
~Mutex()
{}
private:
pthread_mutex_t *pmtx_;
};
// RAII风格的加锁方式
class lockGuard
{
public:
//构造时加锁
//析构时解锁
lockGuard(pthread_mutex_t *mtx):mtx_(mtx)
{
mtx_.lock();
}
~lockGuard()
{
mtx_.unlock();
}
private:
Mutex mtx_; //锁
};
这里解释一下为什么要加锁,因为要维护前提三,也就是要维护互斥的关系。
部分3:阻塞队列这个数据结构
#pragma once
#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>
#include "lockGuard.hpp"
const int gDefaultCap = 5;
template <class T>
class BlockQueue
{
private:
//判断队列的空与满
bool isQueueEmpty()
{
return bq_.size() == 0;
}
bool isQueueFull()
{
return bq_.size() == capacity_;
}
public:
BlockQueue(int capacity = gDefaultCap) : capacity_(capacity)
{
pthread_mutex_init(&mtx_, nullptr);
pthread_cond_init(&Empty_, nullptr);
pthread_cond_init(&Full_, nullptr);
}
void push(const T &in) // 生产者
{
lockGuard lockgrard(&mtx_); // 自动调用构造函数
//不停的检查直到队列非满
while (isQueueFull())
pthread_cond_wait(&Full_, &mtx_); //等待时会自动解锁,消费者可以消费
bq_.push(in); //非满时可以插入任务(数据)
pthread_cond_signal(&Empty_); //既然插入的数据,那么队列为空这个条件变量就可以解除了。
} // 自动调用lockgrard 析构函数
void pop(T *out)
{
lockGuard lockguard(&mtx_);
while (isQueueEmpty())
pthread_cond_wait(&Empty_, &mtx_);
*out = bq_.front();
bq_.pop();
pthread_cond_signal(&Full_);
}
~BlockQueue()
{
pthread_mutex_destroy(&mtx_);
pthread_cond_destroy(&Empty_);
pthread_cond_destroy(&Full_);
}
private:
std::queue<T> bq_; // 阻塞队列
int capacity_; // 容量上限
pthread_mutex_t mtx_; // 通过互斥锁保证队列安全
pthread_cond_t Empty_; // 用它来表示bq 是否空的条件
pthread_cond_t Full_; // 用它来表示bq 是否满的条件
};
部分4:主程序部分,创建线程。
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>
int myAdd(int x, int y) //设置了消费方式
{
return x + y;
}
void* consumer(void *args) //消费者
{
BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;
while(true)
{
// 获取任务
Task t;
bqueue->pop(&t);
// 完成任务
std::cout << pthread_self() <<" consumer: "<< t.x_ << "+" << t.y_ << "=" << t() << std::endl;
// sleep(1);
}
return nullptr;
}
void* productor(void *args) //生产者
{
BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;
while(true)
{
// 制作任务 -- 不一定是从生产者来的
int x = rand()%10 + 1;
usleep(rand()%1000);
int y = rand()%5 + 1;
Task t(x, y, myAdd);
// 生产任务
bqueue->push(t);
// 输出消息
std::cout <<pthread_self() <<" productor: "<< t.x_ << "+" << t.y_ << "=?" << std::endl;
sleep(1);
}
return nullptr;
}
int main()
{
//生成随机数种子
srand((uint64_t)time(nullptr) ^ getpid() ^ 0x32457);
BlockQueue<Task> *bqueue = new BlockQueue<Task>();
pthread_t c[2],p[2];
//创建线程
pthread_create(c, nullptr, consumer, bqueue);
pthread_create(c + 1, nullptr, consumer, bqueue);
pthread_create(p, nullptr, productor, bqueue);
pthread_create(p + 1, nullptr, productor, bqueue);
//销毁线程
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
delete bqueue;
return 0;
}
结果:
没有问题,生产者生产一次消费者就消费一次,并且在这之间存在了加解锁的过程。
4.有关效率
有人可能会疑问“明明还是加锁了,那不是没有提升效率嘛”。
由于生产者与消费之间存在消费场所,就可以做到生产者生产的同时消费者从消费场所拿走数据进行消费。因此提升的不是生产者与消费者之间传递数据的速度,而是提升了生产者生产数据的效率与消费者消费数据的效率。