线程池的应用
在我认知中,任何网络服务器都是一个死循环。这个死循环长下面这个样子。
基本上服务器框架都是基于这个架构而不断开发拓展的。
这个死循环总共分为四个步骤,可以涵盖所有客户端的需求,然而目前绝大多数企业不会用这样的架构。
问题在于容易产生阻塞。
作为客户端,我们当然希望访问服务器的时候,能够在短时间内收到回复,意味着自己连接上了该服务器。但是上述架构却很容易产生响应延迟。
当某一个连接的2.3.4时间过长,也许是因为客户上传了很大的数据,也许是因为业务处理起来比较麻烦,需要计算很多东西,也许是客户需要下载很大的东西,总之只要2,3,4的时间延迟,意味着下一个循环处理其它连接的动作也会被无限延迟。
也就是说,系统需要处理一个很慢的客户端的连接,后面的所有连接,哪怕只是耗时很短的任务,都需要等这个很慢的任务完成才能进行。
所以除了像redis的服务器,数据库都是基于hash的key-value结构,业务处理起来十分快速,才会将1,2,3,4都在一个线程中完成,其它的服务器若要提供千万乃至亿级别的客户接入量,必须更快地处理客户的连接,解除1和234之间的耦合,这才引入了多线程,我的主线程只负责1,然后将2,3,4分发到其它线程中执行。
然而,如果服务器选择这种多线程架构,当我们面临着巨大的客户端流量,则势必需要频繁地创建和销毁线程,这个过程十分浪费系统资源,还容易造成系统崩溃,然后老板震惊,被迫毕业,流落街头,思之令人发笑。
解决办法就是线程池。
我们预先创建好一系列线程,就好比后宫佳丽三千,然后皇上(线程池中枢)来了兴致(收到任务),就去翻一个妃子(线程池中某个线程)的牌子。妃子(线程)解决完需求后,回到后宫(线程池),等待下一次召唤。
不用创建和销毁,而是回收利用,所有池式结构都可以看做是一种对资源调度的缓冲,这就是线程池的精髓。
线程池设计
我们手撕线程池,目的还是搞懂基本原理,不弄太多花里胡哨的架构,比如工厂模式之类的。
当前这个版本的线程池是基于互斥锁和条件变量实现的。
预告(画饼):无锁线程池后续也会手撕。
线程池总体上可以分为三大组件。
-
任务队列(存还没有执行的任务)
-
执行队列(可以看成就是线程池,存放着可以用来执行任务的线程)
-
线程池管理中枢(负责封装前两个类,任务的分发,线程池的创建,销毁,等等。对外提供统一的接口)
其工作流程大概如图所示
任务队列节点数据结构
//- 任务队列元素
class TaskEle{
public:
void (*taskCallback)(void *arg);
string user_data;
void setFunc(void (*tcb)(void *arg)){
taskCallback = tcb;
}
};
任务队列负责存还没有执行的业务,我们可以将每个业务都抽象成一个函数,每个函数自然有可能需要参数。
所以任务队列的节点需要两个成员:
-
taskCallback:函数回调,执行客户端想要的业务。
-
user_data:函数参数,包含客户端的信息,比如socketfd等。
顺便提供了一个接口,可以修改回调函数。
相关视频推荐
手把手实现线程池(120行),实现异步操作,解决项目性能问题
手撕高性能线程池,准备好linux开发环境(另一版本)
线程池在3个开源框架的应用(redis、skynet、workflow)
免费学习地址:c/c++ linux服务器开发/后台架构师
需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享
执行队列节点数据结构
//-执行队列元素
class ExecEle{
public:
pthread_t tid;
bool usable = true;
ThreadPool* pool;
static void* start(void* arg);
};
-
tid:每个节点都对应一个线程,所以需要一个id成员来存线程id,
-
usable:这个成员非常妙,它代表当前线程是否可用,默认为true,一旦设置为false,则该线程会结束。
-
使用usable可以在最后销毁线程池的时候,以一种优雅的方式结束每个线程,而代替pthread_cancel这种强制销毁线程的方式,因为你不知道线程中的任务是否处理完,强制销毁就会使某些业务中断。
-
pool:这个成员是指向中枢管理(后面会讲)的指针,主要是为了在每个线程中通过pool获取到一个全局(对于所有线程池线程共享)的互斥锁和条件变量。
-
start:是线程池对象执行的一个实现线程再回收利用的任务循环。具体实现代码也是在后面会讲。
线程池管理中枢设计
总体结构:
class ThreadPool{
public:
//-任务队列和执行队列
deque<TaskEle*> task_queue;
deque<ExecEle*> exec_queue;
//-条件变量
pthread_cond_t cont;
//-互斥锁
pthread_mutex_t mutex;
//-线程池大小
int thread_count;
//-构造函数
ThreadPool(int thread_count):thread_count(thread_count);
//-创建线程池
void createPool();
//-加入任务
void push_task(void(*tcb)(void* arg),int i);
//-利用析构销毁线程池
~ThreadPool();
};
关于数据成员:
-
task_queue、exec_queue: 任务队列和执行队列,我使用deque作为容器实现队列。
-
cont:所有线程共享的条件变量。
-
mutex:所有线程共享的互斥锁。
-
thread_count: 线程池创建的时候,初始大小
关于成员方法:
-
ThreadPool:构造函数
-
createPool:创建线程池
-
push_task: 给服务器主循环用的,给线程池添加任务。
-
~ ThreadPool: 销毁线程池,事实上应该单独定义一个destroyPool的api,我这里为了简便合并到析构中了。
ExecEle的start函数实现
现在对于ThreadPool对象有概念以后,可以先将刚刚执行队列节点ExecEle的start函数实现,其代表了每个线程池的线程始终在跑的循环,在无任务分配的时候阻塞在某个位置。
void* ExecEle::start(void*arg){
//-获得执行对象
ExecEle *ee = (ExecEle*)arg;
while(true){
//-加锁
pthread_mutex_lock(&(ee->pool->mutex));
while(ee->pool->task_queue.empty()){//-如果任务队列为空,等待新任务
if(!ee->usable){
break;
}
pthread_cond_wait(&ee->pool->cont, &ee->pool->mutex);
}
if(!ee -> usable){
pthread_mutex_unlock(&ee -> pool -> mutex);
break;
}
TaskEle *te = ee->pool->task_queue.front();
ee->pool->task_queue.pop_front();
//-解锁
pthread_mutex_unlock(&(ee->pool -> mutex));
//-执行任务回调
te->user_data+=to_string(pthread_self());
te->taskCallback(te);
}
//-删除线程执行对象
delete ee;
fprintf(stdout,"destroy thread %d\n",pthread_self());
return NULL;
}
arg参数指向的是该线程函数对应的执行元素ExecEle本身的指针,我们定义其为ee。然后进入死循环,通过ee,我们可以获得线程池中枢对象pool。
通过pool。我们可以获得任务队列的情况,当任务队列为空,则线程进入阻塞状态,等待任务队列有任务进来后,通过条件变量通知,再恢复执行。
恢复执行后,从任务队列中取出队首的任务,这个过程需要在mutex的范围内,保证独占性。
之后解除互斥锁,开始执行任务的回调。执行完进行入下个循环,尝试再次获得互斥锁。
最后说一说usable,当我们销毁线程池的时候,设置每一个线程的usable为false,那么不会立刻中断每个线程正在执行的回调,而是等回调结束后,在下一次循环中如果检测到usable为false后,就会退出整个大循环,并释放自己的锁,唤醒线程池其它休眠的线程。退出大循环后,线程自然而优雅地结束。
之后是ThreadPool自己的api实现
构造函数ThreadPool实现:
ThreadPool(int thread_count):thread_count(thread_count){
//-初始化条件变量和互斥锁
pthread_cond_init(&cont,NULL);
pthread_mutex_init(&mutex,NULL);
}
主要为了初始化cont,mutex,thread_count。
创建线程池createPool实现:
void createPool(){
int ret;
//-初始执行队列
for(int i = 0;i<thread_count;++i){
ExecEle *ee = new ExecEle;
ee->pool = const_cast<ThreadPool*>(this);
if(ret = pthread_create(&(ee->tid),NULL,ee->start,ee)){
delete ee;
ERR_EXIT_THREAD(ret,"pthread_create");
}
fprintf(stdout,"create thread %d\n",i);
exec_queue.push_back(ee);
}
fprintf(stdout,"create pool finish...\n");
}
通过pthread_create创建thread_count个线程,每个线程执行自己的start函数进入等待任务循环,并阻塞在锁和条件变量的位置。将exec对象push进执行队列。
添加任务 push_task实现:
void push_task(void(*tcb)(void* arg),int i){
TaskEle *te = new TaskEle;
te->setFunc(tcb);
te->user_data = "Task "+to_string(i)+" run in thread ";
//-加锁
pthread_mutex_lock(&mutex);
task_queue.push_back(te);
//-通知执行队列中的一个进行任务
pthread_cond_signal(&cont);
//-解锁
pthread_mutex_unlock(&mutex);
}
主要功能是构造TaskEle对象并加入到执行队列中。
每个TaskEle可能需要执行不同的业务,所以push_task需要传入对应业务的回调tcb(task callback)
i是我加的额外参数,代表主线程中连接的客户端编号,其意义可以是socketfd。
注意在修改执行队列(push)的时候,需要加锁保证独占。
销毁线程池~ ThreadPool 实现:
~ThreadPool() {
for(int i = 0;i<exec_queue.size();++i){
exec_queue[i]->usable = false;
}
pthread_mutex_lock(&mutex);
//-清空任务队列
task_queue.clear();
//-广播给每个执行线程令其退出(执行线程破开循环会free掉堆内存)
pthread_cond_broadcast(&cont);
pthread_mutex_unlock(&mutex);//-让其他线程拿到锁
//-等待所有线程退出
for(int i = 0;i<exec_queue.size(); ++i){
pthread_join(exec_queue[i] -> tid,NULL);
}
//-清空执行队列
exec_queue.clear();
//-销毁锁和条件变量
pthread_cond_destroy(&cont);
pthread_mutex_destroy(&mutex);
}
先将所有线程的usable设置为false,之后加锁,清空任务队列,并通过条件变量通知所有线程,等所有线程退出后,销毁执行队列,销毁锁和条件变量。
业务代码和服务器主循环
//-线程执行的业务函数
void execFunc(void* arg){
TaskEle *te =(TaskEle*)arg;
fprintf(stdout, "%s\n",te->user_data.c_str());
};
int main(){
//-创建线程池
ThreadPool pool(100);
pool.createPool();
//-创建任务
for(int i =0;i<1000;++i){
pool.push_task(&execFunc,i);
}
exit(EXIT_SUCCESS);
}
随便写的线程执行的业务,打印一下客户信息。
主线程创建100大小的线程池,并添加1000个任务(连接)。
完整代码
// *C++和posix接口实现一个线程池
//-三个组件:任务队列,执行队列,线程池(中枢管理)
#include <unistd.h>
#include <pthread.h>
#include <deque>
#include <string>
#include <iostream>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
using namespace std;
//-打印线程错误专用,根据err来识别错误信息
static inline void ERR_EXIT_THREAD(int err, const char * msg){
fprintf(stderr,"%s:%s\n",strerror(err),msg);
exit(EXIT_FAILURE);
}
class ThreadPool;//-声明
//- 任务队列元素
class TaskEle{
public:
void (*taskCallback)(void *arg);
string user_data;
void setFunc(void (*tcb)(void *arg)){
taskCallback = tcb;
}
};
//-执行队列元素
class ExecEle{
public:
pthread_t tid;
bool usable = true;
ThreadPool* pool;
static void* start(void* arg);
};
//-线程池
class ThreadPool{
public:
//-任务队列和执行队列
deque<TaskEle*> task_queue;
deque<ExecEle*> exec_queue;
//-条件变量
pthread_cond_t cont;
//-互斥锁
pthread_mutex_t mutex;
//-线程池大小
int thread_count;
//-构造函数
ThreadPool(int thread_count):thread_count(thread_count){
//-初始化条件变量和互斥锁
pthread_cond_init(&cont,NULL);
pthread_mutex_init(&mutex,NULL);
}
void createPool(){
int ret;
//-初始执行队列
for(int i = 0;i<thread_count;++i){
ExecEle *ee = new ExecEle;
ee->pool = const_cast<ThreadPool*>(this);
if(ret = pthread_create(&(ee->tid),NULL,ee->start,ee)){
delete ee;
ERR_EXIT_THREAD(ret,"pthread_create");
}
fprintf(stdout,"create thread %d\n",i);
exec_queue.push_back(ee);
}
fprintf(stdout,"create pool finish...\n");
}
//-加入任务
void push_task(void(*tcb)(void* arg),int i){
TaskEle *te = new TaskEle;
te->setFunc(tcb);
te->user_data = "Task "+to_string(i)+" run in thread ";
//-加锁
pthread_mutex_lock(&mutex);
task_queue.push_back(te);
//-通知执行队列中的一个进行任务
pthread_cond_signal(&cont);
//-解锁
pthread_mutex_unlock(&mutex);
}
//-销毁线程池
~ThreadPool() {
for(int i = 0;i<exec_queue.size();++i){
exec_queue[i]->usable = false;
}
pthread_mutex_lock(&mutex);
//-清空任务队列
task_queue.clear();
//-广播给每个执行线程令其退出(执行线程破开循环会free掉堆内存)
pthread_cond_broadcast(&cont);
pthread_mutex_unlock(&mutex);//-让其他线程拿到锁
//-等待所有线程退出
for(int i = 0;i<exec_queue.size(); ++i){
pthread_join(exec_queue[i] -> tid,NULL);
}
//-清空执行队列
exec_queue.clear();
//-销毁锁和条件变量
pthread_cond_destroy(&cont);
pthread_mutex_destroy(&mutex);
}
};
void* ExecEle::start(void*arg){
//-获得执行对象
ExecEle *ee = (ExecEle*)arg;
while(true){
//-加锁
pthread_mutex_lock(&(ee->pool->mutex));
while(ee->pool->task_queue.empty()){//-如果任务队列为空,等待新任务
if(!ee->usable){
break;
}
pthread_cond_wait(&ee->pool->cont, &ee->pool->mutex);
}
if(!ee -> usable){
pthread_mutex_unlock(&ee -> pool -> mutex);
break;
}
TaskEle *te = ee->pool->task_queue.front();
ee->pool->task_queue.pop_front();
//-解锁
pthread_mutex_unlock(&(ee->pool -> mutex));
//-执行任务回调
te->user_data+=to_string(pthread_self());
te->taskCallback(te);
}
//-删除线程执行对象
delete ee;
fprintf(stdout,"destroy thread %d\n",pthread_self());
return NULL;
}
//-线程执行的业务函数
void execFunc(void* arg){
TaskEle *te =(TaskEle*)arg;
fprintf(stdout, "%s\n",te->user_data.c_str());
};
int main(){
//-创建线程池
ThreadPool pool(100);
pool.createPool();
//-创建任务
for(int i =0;i<1000;++i){
pool.push_task(&execFunc,i);
}
exit(EXIT_SUCCESS);
}