代码结构
- 任务:这里用一个int类型的taskNumber代替任务
- 任务队列类:封装了任务队列,存,取等操作。
- 生产者工作函数:生产者执行的函数,向任务队列中添加任务,每个生产者生产3个任务
- 消费者工作函数:消费者执行的函数,从任务队列中拿任务,如果5秒内一直没有任务,则销毁
C++实现代码
#include <stdio.h>
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <unistd.h>
#include <chrono>
using namespace std;
int taskNumber = 0; //任务
class task_queue{
public:
// 构造函数
task_queue(int maxNum=10){
this->maxNum = maxNum;
}
// 添加任务
void add_task(int i){ //i为生产者编号
myMutex.lock(); // 访问临界资源,加锁
// 这里必须使用while而不是if,考虑以下情况:
// A B两个线程同时因为任务队列满而阻塞,现在来了一个空位置,AB同时解除阻塞(因为我们用的是notify_all()函数唤醒所有线程)
// 这时假设A抢到了互斥锁myMutex并添加任务,之后释放myMutex。这时B又抢到锁,但是还没有空位置,如果用if就会出错。所以需要while循环判断是否有任务
while(q.size() == maxNum){ //如果任务队列已经满了
condFull.wait(myMutex); //条件锁阻塞
}
q.push(++taskNumber); //添加任务
condEmpty.notify_all(); //告知因为没有任务而被阻塞的消费者线程解除阻塞
cout<<"【任务"<<taskNumber<<"】已经被【生产者"<<i<<"】添加到任务队列中"<<endl;
myMutex.unlock(); //解锁
sleep(1);
}
// 弹出任务
bool get_task(int i){ //i为消费者编号
myMutex.lock(); // 访问临界资源,加锁
// 使用while而不是if的原因同上,如果条件锁的唤醒函数使用的是notify_one()函数,理论上可以使用if
while(q.empty()){ //任务队列为空,则等待
cv_status flag =condEmpty.wait_for( myMutex, chrono::seconds(5)); //等待5秒
if(flag == std::__1::cv_status::timeout){ //timeout表示5秒都没有任务
cout<<"【子线程"<<i<<"】退出"<<endl;
myMutex.unlock(); //这里要解锁,不然退出的线程会一直占用互斥锁导致死锁
return false; // 等待5秒都没有任务要执行,则退出线程
}
}
int x = q.front(); //取任务
q.pop();
condEmpty.notify_all(); //告知因为任务队列满而阻塞的生产者解除阻塞
cout<<"【消费者"<<i<<"】正在执行【任务"<<x<<"】......"<<endl;
myMutex.unlock(); //互斥锁解锁
sleep(1);
return true;
}
// 获得当前任务数目
int get_task_num(){
lock_guard<mutex>my_lock_guard(myMutex); //使用lockguard自动释放锁
return (int)q.size();
}
// 获得最大任务数据
int get_task_max_num(){
lock_guard<mutex>my_lock_guard(myMutex); //使用lockguard自动释放锁
return maxNum;
}
private:
queue<int>q; //假设一个int代表一个任务task
int maxNum; //最大任务数
mutex myMutex; // 临界资源互斥锁
condition_variable_any condFull; // 任务队列满条件锁
condition_variable_any condEmpty; // 任务队列空条件锁
};
void producer_task(task_queue &q, int i){ // 生产者工作函数,每个生产者生产3个任务, i(0~4)代表生产者编号
int num=3;
while(num--){
q.add_task(i);
// sleep(1);
}
}
void consumer_task(task_queue &q, int i){ // 消费者工作函数,消费者循环消费任务,如果5秒内没有任务则停止工作
while(1){
bool flag = q.get_task(i);
// sleep(1);
if(flag==false)break;
}
}
int main(){
task_queue q(5); //最大任务数为5
thread producer[5]; // 5个生产者对象
thread consumer[5]; // 5个消费者对象
for(int i=0;i<5;++i){
// producer[i] = thread(&task_queue::add_task, &q, i,i); //给生产者指定任务
// consumer[i] = thread(&task_queue::get_task, &q,i); //给消费者指定任务
producer[i] = thread(producer_task, ref(q),i); //给生产者指定任务
consumer[i] = thread(consumer_task, ref(q),i); //给消费者指定任务
}
for(int i=0;i<5;++i){ //主线程等待子线程执行完毕
producer[i].join();
consumer[i].join();
}
return 0;
}
运行结果:
尾部的运行结果,可以看出任务是按照添加的顺序执行的,在等待5秒之后,线程依次退出