0.关注博主有更多知识
操作系统入门知识合集
目录
1.生产者消费者模型
1.1基于阻塞队列的生产者消费者模型
1.2生产消费模型的效率问题
2.信号量
2.1信号量
2.2基于环形队列的生产者消费者模型
2.3环形队列的生产消费模型的效率问题
3.线程池
3.1线程池的实现
1.生产者消费者模型
生产者消费者模型在多线程编程中是一个很重要的模型,因为它通过一个容器(缓冲区)来解决生产者和消费者之间的强耦合问题,使得生产者与消费者之间做到不直接通讯,而是通过缓冲区来进行通讯,使其达到解耦的效果。所以因为解耦,所以生产者生产的数据不需要直接交给消费者,而是扔到缓冲区里面去,消费者同样不需要向生产者索要数据,而是直接从缓冲区里面拿,这样就平衡了生产者和消费者的处理能力。
抛开生产者消费者模型不谈,我们每天都在使用生产者和消费者的身份在编程,这个每天都在进行的动作就是函数调用。例如main()函数调用func()函数,那么main()作为调用方需要将数据作为实参传递给func()函数的形参,这就是一次生成数据的行为,所以main()函数就是一个生产者;func()作为被调用方需要接收来自调用方的实参,然后在内部做一些处理,这种行为我们称为消费行为,所以func()函数就是一个消费者。那么main()函数与func()函数之间的关系是什么样的呢?是强耦合关系。当main()函数调用func()函数时,main()必须阻塞式的等待func()函数执行结束,而func()函数必须在main()函数没有调用自己时而保持阻塞(虽然这是不对的,但是例子很形象,哈哈哈)。那么我们可以给main()函数和func()函数之间添加一个缓冲区,main()函数将实参不直接传递给func()函数,而是直接扔到缓冲区里面去;func()函数需要的数据不是接收main()函数的实参,而是从缓冲区里面拿:
那么在这里例子当中,就是一个简单的生产者消费者模型,我们难挖掘出main()函数和func()函数之间做到了解耦,所以他们两个是独立的函数,所以可以并发执行,并且main()函数的生产速率可以和func()函数的消费速率不保持一致,所以生产者消费者模型有如下几个优点:
1.生产者与消费者做到了解耦
2.生成者与消费者之间可以并发
3.生产者与消费者的执行速率可以不一样
那么在多线程场景当中,生产者不止一个,消费者不止一个,甚至有可能缓冲区还不止一个。那么我们要保证生产者消费者模型的正确执行我们要做好以下工作:
1.保证三种关系:
1)保证生产者与生产者之间的互斥:当多个生产者线程存在时,必须要保证他们之间访问缓冲区的互斥关系。即当一个线程正在向缓冲区写入数据时,其他缓冲区必须阻塞等待,因为在计算机当中数据是可以被覆盖的,如果生产者之间没有互斥关系,并且缓冲区只有一个有效位置,那么写入的数据一定是最后一个生产者写入的数据,这就是造成了数据不一致问题。
2)保证消费者与消费者之间的互斥:与上面同理,当一个消费者线程正在从缓冲区读取数据时,其他消费者不能同时读取数据,必须阻塞等待。不然会造成数据不完整,就类似于去超市买最后一根火腿肠时,去买这个火腿肠的并不是一个人,那么多个人要买一根火腿肠的时候只能一个人买到,不然多个人就要抢这个火腿肠,最终导致火腿肠"四分五裂"。
3)保证生产者与消费者之间的互斥与同步关系:生产者和消费者的行为都是向缓冲区存取数据,所以要保证他们之间的互斥关机,即在同一时刻只允许一个生产者或一个消费者访问缓冲区,否则和可能发生生产者数据写到一半的时候消费者便把数据读走了,这就会导致数据不完整问题。生产者和消费者之间也必须存在同步关系,如果没有同步关系,那么很可能由于一些优先级的关系,对互斥锁或者信号量的竞争力特别强,导致一直在重复访问缓冲区而不给其他生产者和消费者访问的机会,进而操作饥饿问题导致模型不合理工作,所以必须规定出一个规则,例如生产者生产一个消费者消费一个,或者消费者只有在缓冲区不为空时读取数据,生产者只能在缓冲区不为满时生产数据等等类似这样的规则来保证生产者和消费者之间的同步关系。
2.保证两种角色:即一个正常的生产者消费者模型,必须存在至少一个生产者线程和至少一个消费者线程
3.保证一个缓冲区:生产者和消费者模型将生产者和消费者进行了解耦,那么解耦的本质就是通过缓冲区作为通信介质,所以模型当中必须在生产者和消费者之间存在一个缓冲区。
生产者消费者模型速记口诀:三二一。
1.1基于阻塞队列的生产者消费者模型
我们以阻塞队列来作为生产消费模型当中的缓冲区,那么阻塞队列作为空间大小有限的数据结构,我们可以规定以下生产者和消费者同步规则:
1.当阻塞队列的容量达到上限时,生产者不能再向阻塞队列当中生产数据,即生产者陷入阻塞
2.当阻塞队列的为空时,消费者不能再从阻塞队列当中拿数据,即消费者陷入阻塞
3.只有当阻塞队列既不为空又不为满时,生产者和消费者才能互斥的访问阻塞队列
我们先编写一份上层调用逻辑较为简单的生产消费模型的代码:
// consumerProductor.cpp
#include <iostream>
#include <memory>
#include <ctime>
#include <string>
using namespace std;
#include <pthread.h>
#include "blockQueue.hpp"
using namespace blockqueue;
#include <unistd.h>
/*消费者就是从阻塞队列当中拿数据*/
void *consumerDemo(void *args)
{
blockQueue<int> *bq = static_cast<blockQueue<int> *>(args);
while (true)
{
int data;
bq->pop(&data);
cout << "consumer consumed a data: " << data << endl;
sleep(1);/*让消费者消费的稍微慢一些*/
}
pthread_exit(nullptr);
}
/*生产者就是向阻塞队列当中写数据*/
void *productorDemo(void *args)
{
blockQueue<int> *bq = static_cast<blockQueue<int> *>(args);
while (true)
{
int data = rand() % 100;
bq->push(data);
cout << "productor producted a data: " << data << endl;
}
pthread_exit(nullptr);
}
int main()
{
srand((unsigned int)time(nullptr));
blockQueue<int> *bq = new blockQueue<int>();
pthread_t productor,consumer;
pthread_create(&productor,nullptr,productorDemo,bq);
pthread_create(&consumer,nullptr,consumerDemo,bq);
pthread_join(productor,nullptr);
pthread_join(consumer,nullptr);
return 0;
}
// blockQueue.hpp
#pragma once
#include <queue>
#include <pthread.h>
namespace blockqueue
{
using namespace std;
template <class T>
class blockQueue
{
private:
#define MAXCAP 5 /*缓冲区上限大小,随时可变*/
public:
blockQueue(const size_t &cap = MAXCAP) : _cap(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_productorCond, nullptr);
pthread_cond_init(&_consumerCond, nullptr);
}
~blockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_productorCond);
pthread_cond_destroy(&_consumerCond);
}
/*向阻塞队列(缓冲区)生产数据*/
void push(const T &in)
{
pthread_mutex_lock(&_mutex);
/*如果阻塞队列为满,生产者不能生产,进入自己的条件变量等待*/
while(isFull())
{/*这里必须使用while而不是if*/
/*进入条件变量相当于发生一次线程切换
*然是要将锁释放,让其他线程拥有锁
*这就是为什么需要传入锁的原因*/
pthread_cond_wait(&_productorCond,&_mutex);
}
_q.push(in);
/*生产者生产一个,说明阻塞队列就多一个数据
*所以此时可以唤醒消费者消费*/
pthread_cond_signal(&_consumerCond);
pthread_mutex_unlock(&_mutex);
}
/*从阻塞队列(缓冲区)拿数据
*使用输出型参数*/
void pop(T *out)
{
pthread_mutex_lock(&_mutex);
while(isEmpty())
{
pthread_cond_wait(&_consumerCond,&_mutex);
}
*out = _q.front();
_q.pop();
pthread_cond_signal(&_productorCond);
pthread_mutex_unlock(&_mutex);
}
private:
bool isFull()
{
return _q.size() == _cap;
}
bool isEmpty()
{
return _q.size() == 0;
}
private:
/*以一个队列作为缓冲区*/
queue<T> _q;
/*生产者与消费者之间存在互斥与同步关系
*故定义一把锁、生产者条件变量、消费者条件变量*/
pthread_mutex_t _mutex;
pthread_cond_t _productorCond;
pthread_cond_t _consumerCond;
/*缓冲区的大小*/
size_t _cap;
};
} /*namespace blockqueue ends here*/
无论是生产者、消费者谁先被调度,都将遵守一个同步规则,即阻塞队列为满生产者不能生产,阻塞队列为空消费者不能消费。可以看我们的输出结果是正常的,因为一开始的阻塞队列为空所以消费者不能消费,所以这些消费者都会进入自己的条件变量当中等待;那么此时只能由生产者生产数据,并且唤醒正在阻塞的消费者。但可能因为锁的竞争原因,生产者刚释放锁的时候,消费者被唤醒准备去竞争锁,但是生产者距离锁最近,所以生产者一直得到锁,一直得到锁就会一直生产数据直到把阻塞队列写满,写满之后自己陷入阻塞,进而消费者过来消费。然后消费一个数据之后就会导致阻塞队列多出来一个空位,此时就会唤醒一个生产者,并且由于我们上面的代码强行让消费者的消费速率慢一些,所以最后的现象就是消费者消费一个,生产者生产一个。那么对于上面的头文件当中就是生产消费模型的实现,其中编码的过程中存在两个细节问题:
1.为什么pthread_cond_wait()需要将互斥锁作为参数传递?因为我们之前介绍pthread_cond_wait()接口的作用,即哪个线程调用该接口哪个线程就去条件变量下的阻塞队列当中阻塞,直到被其他线程调用pthread_cond_signal()或pthread_cond_broadcast()唤醒。调用pthread_cond_wait()就相当于发生一次线程切换(由运行态切换到阻塞态,虽然不准确,但是可以这么理解),但是我们也说过线程切换时不会释放锁,所以在线程切换的过程中其他线程依然无法获取锁,依然无法进入临界区,但是pthread_cond_wait()不一样,它必须释放锁,也就是进入条件变量阻塞的线程必须释放锁,以便其他线程获取锁然后进入临界区;当在条件变量的线程被唤醒时,pthread_cond_wait()不会马上退出然后线程继续向下执行,pthread_cond_wait()必须保证线被唤醒的线程拥有锁,然后再退出,然后再向下执行,也就是说线程去条件变量下阻塞、释放锁的过程在pthread_cond_wait()内部完成,处于条件变量下阻塞的线程被唤醒,然后要竞争锁的过程也要在pthread_cond_wait()内部完成。
2.为什么在判断阻塞队列为满或为空时需要使用while语句而不是if语句?其实就是为了避免伪唤醒的情况。pthread_cond_wait()一旦退出,那么就说明线程被唤醒并且拥有锁,那么它就可以直接向下执行。如果判断阻塞队列为空或为满的语句为if时,那么当有多个生产者线程向阻塞队列当中写数据,那么如果最后一个线程被唤醒,但是此时的阻塞队列已经满了,但是由于使用的是if语句,那么这个线程不会再做检查,而是直接向下执行,最终造成缓冲区越界:
所以我们需要使用while语句来判断阻塞队列是否为满,即让每一个从pthread_cond_wait()醒来的线程都判断一次阻塞队列是否为满或为空,如果确实为空或为满,那么就继续去条件变量下等待:
还有一个原因就是万一当pthread_cond_wait()执行错误的时候,即pthread_cond_wait()不正常退出的时候(有可能是释放锁失败),线程依然会向下执行,进而导致缓冲区越界等等问题。那么我们使用while语句就能避免这个问题,原因是只有阻塞队列为满或为空的时候生产者或消费者才会进入条件变量阻塞,那么如果pthread_cond_wait()函数出错时不让线程继续向下执行,而是循环地再次执行pthread_cond_wait(),这样也能避免一些意想不到地问题。
当然,上面的Domo只是简单的向阻塞队列当中写一个整数、取一个整数。我们现在要对它进行升级,即生产者向阻塞队列当中生产任务,消费者从阻塞队列当中取任务然后执行,这也是为什么阻塞队列的实现要使用模板的原因。我们来看升级之后的效果:
// Task.hpp
#pragma once
#include <iostream>
#include <string>
using namespace std;
/*负责计算+、-、*、/、%的类
*该类需要左操作数、右操作数、操作符构造
*对外提供函数调用运算符重载,在外部可以表现为仿函数
*计算过程的实现作为该类的私有成员*/
template <class T>
class culculateTask
{
public:
culculateTask() {}
culculateTask(const T &left, const T &right, char oper)
: _left(left), _right(right), _oper(oper)
{
}
/*对外提供一个接口,用来知道准备做什么计算*/
string getTaskName()
{
string left = to_string(_left);
string right = to_string(_right);
string ret = left + " " + _oper + " " + right + " = ?";
return ret;
}
/*提供一个函数调用运算符重载*/
void operator()()
{
T res = calculateTask(_left, _right, _oper);
string left = to_string(_left);
string right = to_string(_right);
string result = left + " " + _oper + " " + right + " = " + to_string(res);
cout << result << endl;
}
private:
T _left;
T _right;
char _oper;
/*计算的过程作为该类的私有成员*/
T calculateTask(const T &left, const T &right, char oper)
{
T result = 0;
if (oper == '+')
result = left + right;
else if (oper == '-')
result = left - right;
else if (oper == '*')
result = left * right;
else if (oper == '/')
{
if (right == 0)
{
cout << "不合法除法!" << endl;
result = -1;
}
else result = left / right;
}
else if (oper == '%')
{
if (right == 0)
{
cout << "不合法取模!" << endl;
result = -1;
}
else result = left % right;
}
return result;
}
};
// consumerProductor.cpp
#include <iostream>
#include <memory>
#include <ctime>
#include <string>
using namespace std;
#include <pthread.h>
#include "blockQueue.hpp"
#include "Task.hpp"
using namespace blockqueue;
#include <unistd.h>
/*消费者就是从阻塞队列当中拿数据*/
void *consumerDemo(void *args)
{
blockQueue<culculateTask<int>> *bq = static_cast<blockQueue<culculateTask<int>> *>(args);
while (true)
{
culculateTask<int> data;
bq->pop(&data);
cout << "consumer consumed a task: ";
data();
sleep(1);/*让消费者消费的稍微慢一些*/
}
pthread_exit(nullptr);
}
/*生产者就是向阻塞队列当中写数据*/
void *productorDemo(void *args)
{
blockQueue<culculateTask<int>> *bq = static_cast<blockQueue<culculateTask<int>> *>(args);
while (true)
{
int left = rand() % 100;
int right = rand() % 200;
char oper[5] = {'+','-','*','/','%'};
culculateTask<int> task(left,right,oper[rand() % sizeof(oper)]);
bq->push(task);
cout << "productor producted a task: " << task.getTaskName() << endl;
}
pthread_exit(nullptr);
}
int main()
{
srand((unsigned int)time(nullptr));
/*calculateTask类现在要负责整数之间的运算*/
blockQueue<culculateTask<int>> *bq = new blockQueue<culculateTask<int>>();
pthread_t productor,consumer;
pthread_create(&productor,nullptr,productorDemo,bq);
pthread_create(&consumer,nullptr,consumerDemo,bq);
pthread_join(productor,nullptr);
pthread_join(consumer,nullptr);
return 0;
}
可以看到该输出结果,生产者不断随机生产任务,消费者不断从阻塞队列当中获取任务然后执行任务。那么现在有这么一个需求,基于上面的计算任务,让线程拥有三个,一个线程充当生产者用于生产任务;一个线程充当消费者用于打印日志(实际上就是将计算结果写到文件当中);一个线程既充当生产者也充当消费者,用于执行任务和将任务的执行结果生产到阻塞队列当中。那么我们的需求的逻辑图就像下面这样:
这样的需求实现起来的成本也不高:
// Task.hpp
#pragma once
#include <iostream>
#include <string>
using namespace std;
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
/*负责将数据写到文件的类*/
class filelogTask
{
public:
filelogTask(){}
filelogTask(const string &data, const string &filename = "./log.txt")
: _data(data), _filename(filename)
{
}
void operator()()
{
FILE *fp = fopen(_filename.c_str(),"a+");
if (fp == nullptr)
{
cerr << "open file fail..." << endl;
exit(-1);
}
int n = fprintf(fp,"%s\n",_data.c_str());
if(n < 0)
{
cerr << "write fata fail..." << endl;
exit(-2);
}
fclose(fp);
}
private:
string _filename;
string _data;
};
/*负责计算+、-、*、/、%的类
*该类需要左操作数、右操作数、操作符构造
*对外提供函数调用运算符重载,在外部可以表现为仿函数
*计算过程的实现作为该类的私有成员*/
template <class T>
class culculateTask
{
public:
culculateTask() {}
culculateTask(const T &left, const T &right, char oper)
: _left(left), _right(right), _oper(oper)
{
}
/*对外提供一个接口,用来知道准备做什么计算*/
string getTaskName()
{
string left = to_string(_left);
string right = to_string(_right);
string ret = left + " " + _oper + " " + right + " = ?";
return ret;
}
/*提供一个函数调用运算符重载
*注意这里将计算结果作为了返回值*/
string operator()()
{
T res = calculateTask(_left, _right, _oper);
string left = to_string(_left);
string right = to_string(_right);
string result = left + " " + _oper + " " + right + " = " + to_string(res);
cout << result << endl;
return result;
}
private:
T _left;
T _right;
char _oper;
/*计算的过程作为该类的私有成员*/
T calculateTask(const T &left, const T &right, char oper)
{
T result = 0;
if (oper == '+')
result = left + right;
else if (oper == '-')
result = left - right;
else if (oper == '*')
result = left * right;
else if (oper == '/')
{
if (right == 0)
{
cout << "不合法除法!" << endl;
result = -1;
}
else
result = left / right;
}
else if (oper == '%')
{
if (right == 0)
{
cout << "不合法取模!" << endl;
result = -1;
}
else
result = left % right;
}
return result;
}
};
// consumerProductor.cpp
#include <iostream>
#include <memory>
#include <ctime>
#include <string>
using namespace std;
#include <pthread.h>
#include "blockQueue.hpp"
#include "Task.hpp"
using namespace blockqueue;
#include <unistd.h>
/*因为中间的消费者/生产者要存取两个阻塞队列
*所以构造一个结构体*/
template <class T>
struct blockQueues
{
blockQueue<T> *bq;
blockQueue<filelogTask> *fbq;
};
/*消费者就是从阻塞队列当中拿数据*/
void *consumerDemo(void *args)
{
blockQueue<culculateTask<int>> *bq = (static_cast<blockQueues<culculateTask<int>> *>(args))->bq;
blockQueue<filelogTask> *fbq = (static_cast<blockQueues<culculateTask<int>> *>(args))->fbq;
while (true)
{
culculateTask<int> data;
bq->pop(&data);
cout << "consumer consumed a task: ";
string ret = data();
filelogTask save(ret);/*函数返回值作为sava的构造函数的参数*/
fbq->push(save);
cout << "consumer producted a data: " << ret << endl;
sleep(1);/*让消费者消费的稍微慢一些*/
}
pthread_exit(nullptr);
}
/*生产者就是向阻塞队列当中写数据*/
void *productorDemo(void *args)
{
blockQueue<culculateTask<int>> *bq = static_cast<blockQueue<culculateTask<int>> *>(args);
while (true)
{
int left = rand() % 100;
int right = rand() % 200;
char oper[5] = {'+','-','*','/','%'};
culculateTask<int> task(left,right,oper[rand() % sizeof(oper)]);
bq->push(task);
cout << "productor producted a task: " << task.getTaskName() << endl;
}
pthread_exit(nullptr);
}
void *saverDemo(void *args)
{
blockQueue<filelogTask> *fbq = static_cast<blockQueue<filelogTask> *>(args);
while(true)
{
filelogTask save;
fbq->pop(&save);
save();
cout << "saver save data success" << endl;
}
}
int main()
{
srand((unsigned int)time(nullptr));
/*calculateTask类现在要负责整数之间的运算*/
blockQueue<culculateTask<int>> *bq = new blockQueue<culculateTask<int>>();
blockQueue<filelogTask> *fbq = new blockQueue<filelogTask>();
blockQueues<culculateTask<int>> *bqs = new blockQueues<culculateTask<int>>();
bqs->bq = bq;
bqs->fbq = fbq;
pthread_t productor,consumer,saver;
pthread_create(&productor,nullptr,productorDemo,bq);
pthread_create(&consumer,nullptr,consumerDemo,bqs);
pthread_create(&saver,nullptr,saverDemo,fbq);
pthread_join(productor,nullptr);
pthread_join(consumer,nullptr);
pthread_join(saver,nullptr);
return 0;
}
上面的Demo都是单生产者线程和单消费者线程,我们在最近的Demo当中进行升级,即修改为多生产者线程和多消费者线程版本:
// consumerProductor.cpp
#include <iostream>
#include <memory>
#include <ctime>
#include <string>
#include <vector>
using namespace std;
#include <pthread.h>
#include "blockQueue.hpp"
#include "Task.hpp"
using namespace blockqueue;
#include <unistd.h>
/*我们为了区分每个线程的名称
*所以构造一个结构体*/
template <class T>
struct blockQueues
{
blockQueue<T> *bq;
blockQueue<filelogTask> *fbq;
pthread_t tid;
string name;
};
/*消费者就是从阻塞队列当中拿数据*/
void *consumerDemo(void *args)
{
blockQueues<culculateTask<int>> *bqs = static_cast<blockQueues<culculateTask<int>> *>(args);
while (true)
{
culculateTask<int> data;
bqs->bq->pop(&data);
cout << bqs->name << "consumed a task: ";
string ret = data();
filelogTask save(ret); /*函数返回值作为sava的构造函数的参数*/
bqs->fbq->push(save);
cout << bqs->name << "producted a data: " << ret << endl;
sleep(1); /*让消费者消费的稍微慢一些*/
}
pthread_exit(nullptr);
}
/*生产者就是向阻塞队列当中写数据*/
void *productorDemo(void *args)
{
blockQueues<culculateTask<int>> *bqs = static_cast<blockQueues<culculateTask<int>> *>(args);
while (true)
{
int left = rand() % 100;
int right = rand() % 200;
char oper[5] = {'+', '-', '*', '/', '%'};
culculateTask<int> task(left, right, oper[rand() % sizeof(oper)]);
bqs->bq->push(task);
cout << bqs->name << "producted a task: " << task.getTaskName() << endl;
}
pthread_exit(nullptr);
}
void *saverDemo(void *args)
{
blockQueues<culculateTask<int>> *bqs = static_cast<blockQueues<culculateTask<int>> *>(args);
while (true)
{
filelogTask save;
bqs->fbq->pop(&save);
save();
cout << bqs->name << "save data success" << endl;
}
}
int main()
{
srand((unsigned int)time(nullptr));
vector<blockQueues<culculateTask<int>> *> vec;
/*calculateTask类现在要负责整数之间的运算*/
blockQueue<culculateTask<int>> *bq = new blockQueue<culculateTask<int>>();
blockQueue<filelogTask> *fbq = new blockQueue<filelogTask>();
for (int i = 0; i < 4; i++)
{ /*生产者线程*/
pthread_t productor;
blockQueues<culculateTask<int>> *bqs = new blockQueues<culculateTask<int>>();
bqs->bq = bq;
bqs->fbq = fbq;
bqs->name = "productor:" + to_string(i + 1);
pthread_create(&productor, nullptr, productorDemo, bqs);
bqs->tid = productor;
vec.push_back(bqs);
}
for (int i = 0; i < 4; i++)
{ /*消费者线程*/
pthread_t consumer;
blockQueues<culculateTask<int>> *bqs = new blockQueues<culculateTask<int>>();
bqs->bq = bq;
bqs->fbq = fbq;
bqs->name = "consumer:" + to_string(i + 1);
pthread_create(&consumer, nullptr, consumerDemo, bqs);
bqs->tid = consumer;
vec.push_back(bqs);
}
for (int i = 0; i < 4; i++)
{ /*日志线程*/
pthread_t saver;
blockQueues<culculateTask<int>> *bqs = new blockQueues<culculateTask<int>>();
bqs->bq = bq;
bqs->fbq = fbq;
bqs->name = "saver:" + to_string(i + 1);
pthread_create(&saver, nullptr, saverDemo, bqs);
bqs->tid = saver;
vec.push_back(bqs);
}
for (auto &e : vec)
{
pthread_join(e->tid, nullptr);
}
return 0;
}
1.2生产消费模型的效率问题
我们知道,生产者消费者模型能够让生产者和消费者解耦,然后并发的、速率不一致的执行。但是生产消费模型的意义何在?或者说它的效率究竟高在哪?实际上我们分析可以知道,生产者与生产者之间互斥、消费者与消费者之间互斥、生产者与消费者之间具有互斥和同步,这就说明无论线程有多少,最终能够进入临界区对缓冲区存取的线程在统一时刻只能有一个线程,那么从宏观来说,对缓冲区存取的操作是一个串行过程,这难道提高效率了吗?实际上我们的想法错了,生产消费模型的效率提高不再于它本身。
我们要知道,生产者要向缓冲区写的数据从何而来,可以是网络、外设、计算等等的来;消费者要执行的任务可能是网络任务、外设任务、计算任务等等。那么在生产消费模型当中,多个生产者线程之间可以是并发从外部获得任务的,当一个生产者线程向缓冲区写入的时候,不会影响其他生产者线程从外部获取任务,这就是一种并发的表现;多个消费者线程之间并发地执行任务,当一个消费者线程从缓冲区拿取地时候,不会影响其他消费者线程执行任务。所以说,生产消费模型的高效率不在存取数据这一环当中,而在于生产者生产之前、消费者消费之后,在这个过程当中,生产者与消费者之间的组合关系都在并发执行。
2.信号量
上面实现的生产消费模型存在一些不足的地方:
1.一个线程要操作临界资源的时候必须先进入临界区判断是否有操作临界资源的条件
2.线程在进入临界区之前无法得知临界资源的使用情况,即生产者没有进入临界区之前,不知道自己能不能生产;消费者没有进入临界区之前,不知道自己能不能消费。所以生产者线程和消费者线程必须先进入临界区,再判断自己能不能生产和消费
3.线程想要得知临界资源的使用情况,必须经过这些步骤:加锁、判断是否满足生产或消费的条件、执行生产或消费、解锁
我到底想表达什么呢?如果线程在进入临界区之前,即加锁之前就能够知道临界资源的使用情况的话,那么整体的执行效率一定会有提升,因为加锁、解锁的过程是非常消耗时间的。举个例子,如果现在的阻塞队列已经为满,那么很显然生产者不能再生产了,但是现在有一个生产者线程正在准备生产数据,但是该线程并不知道自己不能操作阻塞队列,因为它不知道阻塞队列已经没有空闲位置了,所以该线程依然会执行加锁操作,然后走while循环判断阻塞队列的使用情况,然后才发现阻塞队列已经满了,此时才去条件变量下等待,并且释放锁。如果我们分析能力够强的话就会发现这个生产者线程有效工作一个都没做,稀里糊涂的就将自己陷入阻塞了。所以,如果我们能够提供一种执行流进入临界区之前就能知道临界资源使用情况的机制,就能够有效提高整体的执行效率。
其实不单单是效率问题,我们还可以发现上面实现的阻塞队列当中存在很多个"小资源",即阻塞队列当中的每一个元素的存储空间。上面实现的生产消费模型在任何时刻只运行一个执行流操作阻塞队列,但是如果当阻塞队列的容量足够大时,那么多个执行流之间不能并发地操作阻塞队列就是一种"浪费"。但是对于阻塞队列来说,有效操作的位置无非就是阻塞队列头和尾,这有什么并发可言?这就是我为什么要先介绍阻塞队列实现生产消费模型的原因,因为我的目的就是要引出现在要谈的内容:如果想要实现多个执行流之间并发操作缓冲区的生产消费模型,首先使用一种能够操作多个位置的数据结构,例如环形队列(队列不支持随机访问,但是环形队列可以用数组模拟);其次提供一种提前得知资源使用情况的机制,即信号量。也就是说,我们想要实现一种多个执行流可以并发的访问缓冲区中不同位置的生产消费模型。
2.1信号量
什么是信号量:
1.信号量的本质是一个计数器,用来描述临界资源当中资源的数量。也就是说,线程在访问临界资源之前,即进入临界区之前要先申请信号量,得到信号量就可以访问临界资源当中的某一资源,当然了,并发地访问临界资源当中的不同资源是程序员行为,即由程序员控制
2.申请信号量的本质就是一种资源"预定"机制,它能够让线程进入临界区之前知道临界资源的使用情况,这样就避免了无用的加锁和解锁。也就是说得到了信号量就说明得到了资源,就可直接进入临界区从而不需要判断临界资源的使用情况;如果没有得到信号量,那么线程直接阻塞从而不需要进入临界区判断临界资源的使用情况。举一个小例子来理解这种机制:当我们要去电影院看电影的时候,我们不需要先进入电影院看有没有位置,而是先买票,而买票的过程就相当于申请信号量。如果我们成功买到了电影票,就说明我们已经锁定了一个座位,这个作为不可能被其他人占用;如果我们没有买到票,就说明电影院的座位已经满了,我们也不需要去电影院"现场勘探"。
3.信号量也是一种共享资源,它也需要一种保护机制。因为每个线程在进入临界区之前都要先申请信号量,那么就注定信号量这个东西它不是线程私有的,所以它就是一种共享资源,因为它能被所有线程看到。那么它既然作为共享资源,并且能够对资源进行计数,那么它一定会有++、--的操作,而++、--操作我们说过它们不是原子的,所以信号量使用了一种特殊的类型,即sem_t类型,并且信号量保证它的++、--操作都是原子的。
信号量的PV操作:
1.P操作:本质就是对计数器--,计数器--说明资源的数量将会少一份。所以P操作就是对资源的申请。那么信号量必须保证P操作的原子性。
2.V操作:本质就是对计数器++,计数器++说明资源的数量将会多一份。所以V操作就是对资源的释放。那么信号量必须保证V操作的原子性。
所以信号量的核心操作就是PV操作,并且由于PV操作都是原子的,所以我们成它们为PV原语。
我们所说的信号量是POSIX信号量,虽然它与System V信号量作用相同,但是POSIX可以用于线程间通信。既然是POSIX信号量,那么它的接口与其他线程接口大同小异:
1.sem_init():初始化信号量,sem_t为信号量的类型
2.sem_destroy():信号量的销毁
3.sem_wait():P操作,如果申请信号量失败,那么就阻塞等待
4.sem_post():V操作,执行该函数的线程会对已占用的资源释放,表现为将计数器++
2.2基于环形队列的生产者消费者模型
我们在上面说过,阻塞队列的操作是有限的,对于我们来说,只能操作其队头和队尾(因为我们所写的阻塞队列使用queue实现的,它的底层数据结构是链表而非数组,所以不支持随机访问)。那么环形队列我们使用数组来实现,因为我们希望它既有队列的特性又能够支持随机访问,这样就能支持我们随时计算出环形队列的不同位置的下标。
我们要先以单生产者线程、单消费者线程来讨论为什么使用环形队列和信号量,能够支持多个线程并发地操作环形队列中的不同位置。上面介绍过生产者和消费者的特性,所以不难推断出生产者关心的是环形队列当中空闲位置的数量,消费者关心的是环形队列当中已经存有有效数据的位置数量,所以需要两个信号量,一个用来表示空闲位置的数量,另一个用来表示已经存有有效数据的位置的数量。既然生产者是向空闲位置存放数据,消费者是从非空位置读取数据,并且缓冲区使用的数据结构还是环形队列,这就说明生产者和消费者很有可能处在同一位置上,而处在同一位置上又分为两种情况:
1.环形队列全为空:此时消费者不能消费,只允许生产者生产
2.环形队列全为满:此时生产者不能生产,只允许消费者消费
那么该如何理解环形队列的生产消费过程?我们以一个简单的例子来说:假设生产者与消费者之间规定一个规则,即生产者只能向环形队列当中的空闲位置生产数据,并且对一个空闲位置存放数据后立马向后寻找最近的一个空闲位置;消费者只能从存放了有效数据的位置拿取数据,并且拿取数据之后立马向后寻找最近一个存放有效数据的位置。这条规则就可以分析出三个结论:
1.当生产者和消费者都处在同一位置时,只能由其中一个对该位置进行操作
2.消费者不会超过生产者,因为环形队列的初始化状态一定全为空,所以根据规则,一定是生产者先生产数据,消费者就算消费的再快也必须等生产者先生产数据
3.生产者不会对消费者进行"超圈"行为,生产者和消费者就像赛跑一样,生产者一直在前面跑,一直掉道具,消费者在后面追,一直捡道具。如果消费者捡道具的速度特别慢,那么生产者即将"超圈"的时候就会停下,因为消费者消费的太慢了,此时环形队列已经没有空闲位置了
所以不难发现,绝大部分时候,生产者的生产过程和消费者的消费过程都在并发执行,只有当环形队列为空或为满时,生产者和消费者之间才存在互斥与同步问题!如果细致描述的话,大概就是这样:生产者要生产数据之前先申请信号量,得到信号量之后向环形队列当中的某个位置生产数据;与此同时,消费者也会申请信号量,然后从环形队列当中的某个位置拿取数据。生产者和消费者的申请信号量、生产或读取数据都在并发执行。那么当生产者得到了信号量之后就会将空位的信号量--,生产了数据之后,有效数据的信号量++;消费者得到信号量之后会将空位的信号量--,消费了一个数据之后,空闲位置的信号量++。那么当其他线程或者当前线程再次申请信号量的时候,就能够保证多个线程之间同时对环形队列当中的不同位置进行存取,并且不出错。
那么信号量能够用于互斥与同步操作,例如刚才所说,信号量能够限制一个特定规则,即生产者只能向空闲位置生产数据,消费者只能从非空闲位置拿取数据;当环形队列为空或为满时,只能互斥式地存取数据。互斥锁的本质就是一个信号量,只不过锁一般只能对一份资源计数,例如阻塞队列,那么锁描述的就是一整个阻塞队列。我们以一个逻辑图来看待生产者和消费者的执行过程:
由此图可知,无论生产者线程、消费者线程谁先被调度,都能保证生产者先生产数据。因为我们说过,申请信号量失败会导致申请信号量失败的线程陷入阻塞,阻塞表现就是P操作函数不退出。所以如果是生产者先被调度,那么它正常生产数据;如果是消费者先被调度,那么它将陷入阻塞,进而操作系统调度生产者线程生产数据。上面的逻辑图仅仅是单生产者线程、单消费者线程,它们绝大部分情况下都在并发执行。事实上缓冲区无论采用什么数据结构,都要保证我们的"三二一"原则,也就是说,无论生产者线程有几个、消费者线程有几个,同一时刻只允许一个生产者线程、一个消费者线程操作循环队列。即使是这样,使用循环队列作为缓冲的生产消费模型在效率上还是要比阻塞队列实现的生产消费模型高不少。
我们以单生产者线程和单消费者线程为例,实现环形队列的生产消费模型,并做出测试:
// ringQueue.hpp
#pragma once
#include <vector>
#include <semaphore.h>
#include <cassert>
#include <pthread.h>
namespace ringqueue
{
using namespace std;
template <class T>
class ringQueue
{
private:
#define MAXCAP 5
public:
ringQueue(const size_t &cap = MAXCAP)
: _queue(cap), _cap(cap),_consumerPos(0),_productorPos(0)
{
/*消费者信号量初始化为0
*生产者信号量初始化为队列的容量上限*/
sem_init(&_dataSem,0,0);
sem_init(&_spaceSem,0,_cap);
pthread_mutex_init(&_productorMutex,nullptr);
pthread_mutex_init(&_consumerMutex,nullptr);
}
~ringQueue()
{
sem_destroy(&_dataSem);
sem_destroy(&_spaceSem);
pthread_mutex_destroy(&_productorMutex);
pthread_mutex_destroy(&_consumerMutex);
}
void push(const T& in)
{
/*生产者生产数据之前,先申请信号量*/
P(_spaceSem);
/*生产者之间互斥地访问临界资源*/
pthread_mutex_lock(&_productorMutex);
_queue[_productorPos++] = in;
_productorPos %= _cap;
pthread_mutex_unlock(&_productorMutex);
/*生产者生产数据之后,++消费者的信号量*/
V(_dataSem);
}
void pop(T *out)
{
/*消费者消费数据之前,先申请信号量*/
P(_dataSem);
/*消费者之间互斥地访问临界资源*/
pthread_mutex_lock(&_consumerMutex);
*out = _queue[_consumerPos++];
_consumerPos %= _cap;
pthread_mutex_unlock(&_consumerMutex);
V(_spaceSem);
}
private:
/*封装P、V操作*/
void P(sem_t &sem)
{
int n = sem_wait(&sem);
assert(n != -1);
(void)n;
}
void V(sem_t &sem)
{
int n = sem_post(&sem);
assert(n != -1);
(void)n;
}
private:
vector<T> _queue; /*用数组模拟环形队列*/
/*环形队列的容量上限*/
size_t _cap;
/*消费者关注有效数据,生产者关注空闲位置
*故定义两个信号量
*信号量只能保证生产者与消费者的互斥与同步
*要保证生产者与生产者、消费者与消费者之间的互斥还需要互斥锁*/
sem_t _dataSem;
sem_t _spaceSem;
pthread_mutex_t _productorMutex;
pthread_mutex_t _consumerMutex;
/*消费者、生产者访问的下标位置*/
int _consumerPos;
int _productorPos;
};
} /*namespace ringqueue ends here*/
// consumerProductor.cpp
#include <pthread.h>
#include <memory>
#include <ctime>
#include <iostream>
#include <vector>
using namespace std;
#include "Task.hpp"
#include "ringQueue.hpp"
using namespace ringqueue;
#include <unistd.h>
void *productorDemo(void *args)
{
ringQueue<culculateTask<int>> *rq = static_cast<ringQueue<culculateTask<int>> *>(args);
while (true)
{ /*生产者生产数据*/
int left = rand() % 100;
int right = rand() % 100;
char oper[5] = {'+','-','*','/','%'};
culculateTask<int> task(left,right,oper[rand() % 5]);
rq->push(task);
cout << "productor: " << pthread_self() << " send a task: " << task.getTaskName() << endl;
}
}
void *consumerDemo(void *args)
{
ringQueue<culculateTask<int>> *rq = static_cast<ringQueue<culculateTask<int>> *>(args);
while (true)
{ /*消费者消费数据*/
culculateTask<int> task;
rq->pop(&task);
cout << "consumer :" << pthread_self() << " get a task: ";
task();/*该任务内部已经打印了计算结果*/
sleep(1);/*消费者消费的慢一些*/
}
}
int main()
{
srand((unsigned int)time(nullptr));
vector<pthread_t> vec;
/*环形队列用来存储用于整形计算的任务
*calculate打错了,打成了culculate,读者自己注意*/
ringQueue<culculateTask<int>> *rq = new ringQueue<culculateTask<int>>();
for (int i = 0; i < 4; i++)
{
pthread_t productor;
pthread_create(&productor, nullptr, productorDemo, rq);
vec.push_back(productor);
}
for (int i = 0; i < 4; i++)
{
pthread_t consumer;
pthread_create(&consumer, nullptr, consumerDemo, rq);
vec.push_back(consumer);
}
for (auto &e : vec)
{
pthread_join(e, nullptr);
}
return 0;
}
可以看到,在单CPU上执行的效果与阻塞队列的执行结果几乎没有什么差别,甚至在红色方框内还发生了竞争锁造成的饥饿问题,并且还不能体现出生产者与消费者并发执行的特点。那么在这里回答以下几个问题:
1.为什么消费者比生产者后执行?首先CPU调度线程是随机的,我们上面也分析过无论调度的是生产者还是消费者,最先运行的都是生产者,因为上面的输出结果是在单CPU下执行的,所以CPU执行生产者的业务逻辑会很快,所以知道缓冲区写满之后消费者才开始消费数据
2.为什么多个线程竞争一把锁会造成饥饿问题?上面我们说过刚解锁的线程离锁最近,所以竞争能力强,但是这种理解是非常感性的,所以现在理性分析以下。首先我们知道,竞争锁成功之后pthread_mutex_lock()函数直接退出,线程继续向下执行,这就说明加锁成功的过程直接在用户态完成;如果一个线程加锁失败,那么它将陷入阻塞,会被调度到一个阻塞队列当中,也就是说阻塞的过程需要内核来完成,也就是说线程加锁失败陷入阻塞之后需要调用一个系统调用来陷入内核,然后在一个阻塞队列上阻塞。那么当持有锁的线程解锁后,该线程处于用户态,它继续去申请锁,同时正在阻塞的线程也感应到了锁处于可用状态,所以这两个线程又同时去竞争锁;而处于用户态的线程可以直接拿到锁,处于内核态的线程需要先返回用户态才能去申请锁,而由内核态返回用户态的过程是繁琐、复杂且耗时的,因此该线程从内核态转向用户态的过程当中,其他处于用户态的线程已经把锁给拿走了,当这个线程从内核态返回到用户态之后去申请锁的时候,发现锁又没了,又继续陷入阻塞,从而造成饥饿问题。
如果我们在多CPU环境下执行该程序,可以观察到并发的场景:
非常混乱......这就是为什么多线程程序难以调试和维护的原因。
要注意申请信号量和申请锁的顺序关系,一定是申请信号量在前,申请锁在后。我们的目的就是使用信号量,使得线程在访问临界资源时不用申请锁就能够知道临界资源的使用情况,如果这两个顺序写反了,那么就是"脱裤子放屁"了。
2.3环形队列的生产消费模型的效率问题
经过上面的编码以及理论分析,我们发现环形队列版本与阻塞队列版本大差不差,只不过环形队列多作了一些优化而已。那么环形队列的生产消费模型的效率问题还是和阻塞队列的生产消费模型的效率问题一样,效率的提升不在于对缓冲区的操作当中,而在于多个生产者之间可以并发地从外部获取任务并且互不影响;多个消费者之间可以并发地执行任务并且互不影响。
但是我们一定要注意,环形队列的优化是非常重要的,因为它使用了信号量,可以避免没有意义的加锁和解锁、去条件变量中等待和从条件变量里醒来,并且我们知道,加锁和解锁、条件变量的操作都与内核有关,这就意味着一定会多次陷入内核,然后又多次从内核返回用户态,这是非常浪费系统资源、降低执行效率的东西。
3.线程池
很明显,现在要介绍的就是池化技术。事实上池化技术无处不在,例如农民,它们不会等到要吃饭的时候再去种地,而是先种地,把粮食囤起来,需要的时候再吃;饭店里的厨师不会等到客人点菜的时候再去买菜,而是先把菜买好,保存起来,等到客人点菜的时候再做......那么在C++当中,也有一个我们经常打交道的池化技术的实现,即STL当中的空间配置器。空间配置器的原理就是将空间预先开辟好,这个空间的大小在预期之内或者比预期稍大,那么当用于需要开辟空间的时候直接从已经开辟好的空间获取,而不是调用系统调用去堆上申请空间。如果用户一小块一小块的申请空间,这就意味着为调用多次系统调用,因为空间申请的太过频繁,又会触发操作系统的一些内存管理机制,记录整合内存碎片、重新分配空间等等:
线程池也是一样的道理,它也是一种池化技术。在以前的编程当中,需要处理任务时,才创建一个线程,这就类似于需要空间时就申请一次空间,会造成系统开销增大。那么同理,如同空间配置器一样,我们先预先创建一批线程,这些线程检测当前自己有没有被指定执行某个任务,然后选择休眠或执行任务:
那么从该图可以看出,线程池的本质也是一个生产消费者模型。
3.1线程池的实现
我们希望将以前写的两个小组件整合起来,即以前写过的对pthread原生线程库的封装和RAII风格的锁。线程池的实现非常简单,具体分析过程就省略了,下面附上线程池的代码:
// threadPool.hpp
#pragma once
#include <vector>
#include <queue>
#include "Thread.hpp"
#include "Mutex.hpp"
#include <unistd.h>
#include <pthread.h>
namespace threadpool
{
using namespace std;
using namespace thread;
using namespace mutex;
template <class T>
class threadPool;
template <class T>
class threadData/*传递给线程例程的参数*/
{
public:
threadPool<T> *_this;
string _name;
};
template <class T>
class threadPool
{
private:
#define INIT_THREAD_NUM 10 /*默认线程数*/
static void *start_routine(void *args)
{
threadData<T> *td = static_cast<threadData<T> *>(args);
while (true)
{
T task;
{
LockGuard lockguard(td->_this->mutex());
while (td->_this->isQueueEmpty())
{ /*如果队列为空就说明没有任务,没有任务就去条件变量阻塞*/
td->_this->queueWait();
}
task = td->_this->taskPop();
td->_this->queueUnlock();
}
/*获取任务是互斥的,但是执行任务是并发的*/
cout << td->_name << " get a task: " << task.getTaskName() << " result: " << task() << endl;
}
}
private:
/*将一些操作封装起来*/
void queueLock() { pthread_mutex_lock(&_mutex); }
void queueUnlock() { pthread_mutex_unlock(&_mutex); }
bool isQueueEmpty() { return _taskQueue.empty(); }
void queueWait() { pthread_cond_wait(&_cond, &_mutex); }
T taskPop()
{
T task = _taskQueue.front();
_taskQueue.pop();
return task;
}
pthread_mutex_t *mutex() { return &_mutex; }
public:
threadPool(int num = INIT_THREAD_NUM)
: _threadVec(num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
for (int i = 0; i < _threadVec.size(); i++)
{
/*构造函数只负责创建线程对象
*参数的传递在start方法中*/
_threadVec[i] = new Thread();
}
}
~threadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
/*提供给外部的接口,外部调用start接口
*就说明线程池启动,可以投递任务*/
void start()
{
for (auto &e : _threadVec)
{
threadData<T> *td = new threadData<T>();
td->_name = e->thread_name();
td->_this = this;
e->start(start_routine, td);
cout << e->thread_name() << " start ..." << endl;
}
}
/*向线程池当中添加任务*/
void push(const T &in)
{
LockGuard lockguard(&_mutex);
_taskQueue.push(in);
pthread_cond_signal(&_cond);
}
private:
/*vector:管理线程的数据结构
*queue:管理任务的数据结构*/
vector<Thread *> _threadVec;
queue<T> _taskQueue;
/*未来肯定是要访问队列的,所以队列是一个共享资源
*那么就需要锁
*如果线程没有拿到任务,那么就要去条件变量下阻塞*/
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
} /*namespace threadpool ends here*/
这里我对以前封装原生线程库进行了一些修改,使得在构造函数负责线程名的创建,在start方法才负责线程的启动:
// Thread.hpp
#pragma once
#include <functional>
#include <iostream>
#include <cstring>
#include <string>
#include <cerrno>
#include <pthread.h>
namespace thread
{
using namespace std;
/*
*外部创建线程时需要指定一个函数指针和一个参数(void *类型)
*Thrad类将该回调函数用包装器包装
*/
class Thread
{
public:
typedef function<void *(void *)> func_t;
public:
Thread()
{
char nameBuffer[1024];
snprintf(nameBuffer,sizeof(nameBuffer),"thread %d",_threadId++);
_name = nameBuffer;
}
/*
*线程的运行我们想手动控制
*但是pthread_create指定的回调函数为void *start_routine(void *)类型
*所以无法给pthread_create传递包装器类型,故在此函数之后定义一个静态成员函数
*将this指针传递给该静态成员函数,在静态成员函数里面再回调包装器包装的函数
*/
void start(const func_t& func,void *args)
{
_func = func;
_args = args;
int n = pthread_create(&_tid,nullptr,start_routine,(void *)this);
if(n != 0)
{
cout << "error code:" << n << " " << strerror(n) << endl;
exit(n);
}
}
/*
*如果该函数不是静态成员函数,那么就有一个隐藏的this指针
*因为没有this指针,所以无法直接使用成员变量_func
*所以我们在pthread_create()中手动传递一个this指针
*/
static void *start_routine(void *args)
{
Thread *_this = static_cast<Thread *>(args);
return _this->_func(_this->_args);
}
const string &thread_name()
{
return _name;
}
/*线程等待,将返回值一并带出*/
void *join()
{
void *ret;
int n = pthread_join(_tid,&ret);
if(n != 0)
{
cout << "error code:" << n << " " << strerror(n) << endl;
exit(n);
}
return ret;
}
private:
pthread_t _tid;
void *_args;
func_t _func;
string _name;
static size_t _threadId;
};
size_t Thread::_threadId = 1;
}// the namespace ly ends here
RAII风格的锁的代码没有变化,这里就不再展示了。对于线程池当中的任务,依然使用上面的Task.hpp,这里就不展示了。最后附上测试用例:
#include "threadPool.hpp"
using namespace threadpool;
#include "Task.hpp"
#include <iostream>
#include <memory>
using namespace std;
#include <unistd.h>
int main()
{
/*线程当中的任务队列存储整数运算任务*/
unique_ptr<threadPool<culculateTask<int>>> uptp(new threadPool<culculateTask<int>>());
uptp->start();
int left = 0,right = 0;
char oper;
while(true)
{
cout << "Input left number# ";
cin >> left;
cout << "Input right number# ";
cin >> right;
cout << "Input oper# ";
cin >> oper;
culculateTask<int> task(left,right,oper);
uptp->push(task);
usleep(1234);
}
return 0;
}
执行结果