生产者消费者模式思路就是:一批专门生产资源的线程 和一批专门处理资源的线程以及一个线程安全的任务队列组成的 。并且当任务队列满的时候阻塞生产线程,任务队列为空的时候阻塞消费线程。
要实现一个生产者消费者队列
1。需要实现线程同步,访问任务队列互斥
所以需要用到
条件变量 ,互斥锁
条件变量
wait 阻塞 当前线程 当满足条件的时候 调用 notify_one或notify_all 唤醒阻塞线程
当收到notify_one 唤醒一个线程
当收到notify_all唤醒所有线程
2.创建多个线程
thread t1(“func”);
“func” 为线程调用的处理函数
3.任务队列
可以使用queue实现
简单的生产者消费者模式实现需要有 以下几个方法
1.push 方法 用来给任务队列中添加任务,生产者任务调用
思路:
先加锁 判断当前队列size是否等于max_task_count(提前设置的最大任务数)
如果相等表示任务队列满了,则调用消息变量的wait()来阻塞生产者线程
如果任务队列没满则添加任务到消息队列中 并且唤醒消费者线程。(因为生产者如果成功生产了资源,消费者就可以在任务队列中拿到资源了)
2.pop 方法 用来从任务队列中取出任务,消费者任务调用
先加锁
判断队列是否为null
如果为空阻塞消费者线程
如果不为空 则获取一个任务,然后将这个任务在队列中删除
唤醒生产者线程;
3.pro_task方法 生产者任务方法(生产线程调用),调用push方法,向任务队列中添加任务
producer_count表示生产者生产了多少个任务
pro_count表示一共有多少个任务
while(1){
加锁
调用push方法添加一个任务
生成一个任务 使用了 function<void()> task 和bind(func,parm);
解锁
}
4.con_task方法 消费者任务方法(消费线程调用),调用pop方法,从任务队列中取出任务,执行任务。
while(1){
加锁
调用pop方法获取一个任务。
执行任务;
解锁
}
#include<mutex>
#include<thread>
#include<condition_variable>
#include<queue>
#include<iostream>
#include<functional>
#include<Windows.h>
using namespace std;
typedef function<void()> Task;
int max_task_count;//队列最大容量
int pro_count;//需要生产的资源数
int producer_count;//生产者已经生产数量
int consumer_count;//消费者消费数量
queue<Task> resouce;//任务队列
mutex m_mtx;//用来保护任务队列
mutex m_pro_mtx;//保护生产线程任务
mutex m_con_mtx;//保护消费线程任务
condition_variable pro_con;//实现生产线程同步
condition_variable con_con;//实现消费线程同步
void init(int max_task_num, int pro_num) {
max_task_count = max_task_num;
pro_count = pro_num;
producer_count = 0;
consumer_count = 0;
}
void func(int i) {
cout << "task finish" << "-----" << i << endl;
}
//添加任务
void push(Task& task)
{
//生产者给队列中添加任务
unique_lock<mutex> lck(m_mtx);
while (resouce.size() == max_task_count)
{
cout << "队列已经满了,生产者等待" << endl;
pro_con.wait(lck);
}
resouce.push(task);
//唤醒消费者线程
con_con.notify_all();
}
//删除任务
Task pop()
{
//消费者从队列中取出任务
unique_lock<mutex>lck(m_mtx);
while (resouce.size() == 0)
{
cout << "队列为空,消费者等待" << endl;
con_con.wait(lck);
}
Task task = resouce.front();
resouce.pop();
//唤醒生产者线程
pro_con.notify_all();
return task;
}
//生产者
void proTask()
{
while (1) {
unique_lock<mutex> lck(m_pro_mtx);
if (producer_count < pro_count) {
producer_count++;
Task task = bind(func, producer_count);
push(task);
cout << "生产者生成任务" << endl;
}
lck.unlock();
}
}
//消费者
void conTask()
{
while (1) {
unique_lock<mutex> lck(m_con_mtx);
if (consumer_count < pro_count) {
consumer_count++;
Task task = pop();
cout << "消费者消费执行任务" << endl;
task();
}
lck.unlock();
}
}
int main()
{
init(10, 20);
thread pro1(proTask);
thread pro2(proTask);
thread pro3(proTask);
thread con1(conTask);
thread con2(conTask);
thread con3(conTask);
pro1.join();
pro2.join();
pro3.join();
con1.join();
con2.join();
con3.join();
return 0;
}