生产者消费者模型
OS经典问题,生产者消费者模型,empty和full还有mutex对应到C++上如何处理看代码即可
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
using namespace std;
//生产者和消费者模型
//缓冲区
queue<int> qe;
//条件变量
condition_variable contidion;
mutex mtx;
void Producer()
{
for (int i = 0; i < 10; i++)
{
//要加括号限制作用域,以便unique_lock调用自身析构函数自动解锁
{
unique_lock<mutex> lock(mtx);
qe.push(i);
//通知消费者来取任务
contidion.notify_one();
cout << "生产者放入任务:" << i << endl;
}
this_thread::sleep_for(100ms);
}
}
void Consumer()
{
while (1)
{
unique_lock<mutex> lock(mtx);
//如果队列为空,必须等待,第二个参数为true,代表不阻塞,false是阻塞
contidion.wait(lock,[]() {
return !qe.empty();
});
int value = qe.front();
qe.pop();
cout << "消费者消费任务:" << value << endl;
}
}
int main()
{
thread t1(Producer);
thread t2(Consumer);
t1.join();
t2.join();
return 0;
}
C++11线程池
中途用到了许多C++11的新特性,具体看注释
#include <iostream>
#include <queue>
#include <thread>
#include <condition_variable>
#include <vector>
#include <mutex>
#include <functional>
using namespace std;
class ThreadPool {
public:
//构造函数
ThreadPool(int numThread) :stop(false) {
for (int i = 0; i < numThread; i++)
{
//emplace_back要放入thread,lambda表达式则是这个thread对象创建的构造函数
threads.emplace_back([this] () {
while (1) {
unique_lock<mutex> lock(mtx);
condition.wait(lock, [this]() {
return !tasks.empty() || stop;//队列为空时等待或者线程停止了不等待
});
//如果线程停止了同时任务队列为空,也不需要取任务了
if (stop && tasks.empty())
return;
//取到任务队列里的任务
function<void()> task(move(tasks.front()));
//既然取到了任务,就从任务队列里弹出这个任务
tasks.pop();
//没有限制作用域就手动解锁
lock.unlock();
//调用取到的任务
task();
}
});
}
}
//析构函数
~ThreadPool() {
{
//stop是共享变量,访问要加锁,单独作用域以便lock销毁时调用自身析构解锁
unique_lock<mutex> lock(mtx);
stop = true;
}
//通知所有线程来把任务取走完成
condition.notify_all();
//依次完成任务
for (auto& t : threads)
{
t.join();
}
}
//可变长参数用法...Args,代表函数中若干个参数
template<class F,class...Args>
//模版中的右值引用是万能引用,能接受左值和右值
void enqueue(F &&f,Args&&... args)
{
//参数列表和函数绑定后,那么得到的函数对象就不需要参数了以及完美转发,左值进左值出右值同理
function<void()> task = bind(forward<F> (f), forward<Args> (args)...);
{
unique_lock<mutex> lock(mtx);
//转成右值减少开销
tasks.emplace(move(task));
}
condition.notify_one();
}
private:
//线程数组
vector<thread> threads;
//任务队列
queue<function<void()>> tasks;
//互斥锁
mutex mtx;
//条件变量
condition_variable condition;
//线程是否停止
bool stop;
};
int main()
{
//线程池中开辟四个线程
ThreadPool pool(4);
for (int i = 0; i < 10; i++)
{
pool.enqueue([i]() {
cout << "任务" << i << "运行中!" << endl;
this_thread::sleep_for(1s);
cout << "任务" << i << "已结束!" << endl;
});
}
return 0;
}