c++代码实现一个线程池(清晰明了,一看就懂)

news2024/11/20 18:43:07

线程池的应用

在我认知中,任何网络服务器都是一个死循环。这个死循环长下面这个样子。

基本上服务器框架都是基于这个架构而不断开发拓展的。

这个死循环总共分为四个步骤,可以涵盖所有客户端的需求,然而目前绝大多数企业不会用这样的架构。

问题在于容易产生阻塞。

作为客户端,我们当然希望访问服务器的时候,能够在短时间内收到回复,意味着自己连接上了该服务器。但是上述架构却很容易产生响应延迟。

当某一个连接的2.3.4时间过长,也许是因为客户上传了很大的数据,也许是因为业务处理起来比较麻烦,需要计算很多东西,也许是客户需要下载很大的东西,总之只要2,3,4的时间延迟,意味着下一个循环处理其它连接的动作也会被无限延迟。

也就是说,系统需要处理一个很慢的客户端的连接,后面的所有连接,哪怕只是耗时很短的任务,都需要等这个很慢的任务完成才能进行。

所以除了像redis的服务器,数据库都是基于hash的key-value结构,业务处理起来十分快速,才会将1,2,3,4都在一个线程中完成,其它的服务器若要提供千万乃至亿级别的客户接入量,必须更快地处理客户的连接,解除1和234之间的耦合,这才引入了多线程,我的主线程只负责1,然后将2,3,4分发到其它线程中执行。

然而,如果服务器选择这种多线程架构,当我们面临着巨大的客户端流量,则势必需要频繁地创建和销毁线程,这个过程十分浪费系统资源,还容易造成系统崩溃,然后老板震惊,被迫毕业,流落街头,思之令人发笑。

解决办法就是线程池。

我们预先创建好一系列线程,就好比后宫佳丽三千,然后皇上(线程池中枢)来了兴致(收到任务),就去翻一个妃子(线程池中某个线程)的牌子。妃子(线程)解决完需求后,回到后宫(线程池),等待下一次召唤。

不用创建和销毁,而是回收利用,所有池式结构都可以看做是一种对资源调度的缓冲,这就是线程池的精髓。

线程池设计

我们手撕线程池,目的还是搞懂基本原理,不弄太多花里胡哨的架构,比如工厂模式之类的。

当前这个版本的线程池是基于互斥锁和条件变量实现的。

预告(画饼):无锁线程池后续也会手撕。

线程池总体上可以分为三大组件。

  • 任务队列(存还没有执行的任务)

  • 执行队列(可以看成就是线程池,存放着可以用来执行任务的线程)

  • 线程池管理中枢(负责封装前两个类,任务的分发,线程池的创建,销毁,等等。对外提供统一的接口)

其工作流程大概如图所示

任务队列节点数据结构

//- 任务队列元素
class TaskEle{
public:
    void (*taskCallback)(void *arg);
    string user_data;
    void setFunc(void (*tcb)(void *arg)){
        taskCallback = tcb;
    }
};

任务队列负责存还没有执行的业务,我们可以将每个业务都抽象成一个函数,每个函数自然有可能需要参数。

所以任务队列的节点需要两个成员:

  • taskCallback:函数回调,执行客户端想要的业务。

  • user_data:函数参数,包含客户端的信息,比如socketfd等。

顺便提供了一个接口,可以修改回调函数。

相关视频推荐

手把手实现线程池(120行),实现异步操作,解决项目性能问题

手撕高性能线程池,准备好linux开发环境(另一版本)

线程池在3个开源框架的应用(redis、skynet、workflow)

免费学习地址:c/c++ linux服务器开发/后台架构师

需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

执行队列节点数据结构

//-执行队列元素
class ExecEle{
public:
    pthread_t tid;
    bool usable = true;
    ThreadPool* pool;
    static void* start(void* arg);
};
  • tid:每个节点都对应一个线程,所以需要一个id成员来存线程id,

  • usable:这个成员非常妙,它代表当前线程是否可用,默认为true,一旦设置为false,则该线程会结束。

  • 使用usable可以在最后销毁线程池的时候,以一种优雅的方式结束每个线程,而代替pthread_cancel这种强制销毁线程的方式,因为你不知道线程中的任务是否处理完,强制销毁就会使某些业务中断。

  • pool:这个成员是指向中枢管理(后面会讲)的指针,主要是为了在每个线程中通过pool获取到一个全局(对于所有线程池线程共享)的互斥锁和条件变量。

  • start:是线程池对象执行的一个实现线程再回收利用的任务循环。具体实现代码也是在后面会讲。

线程池管理中枢设计

总体结构:

class ThreadPool{
public:
    //-任务队列和执行队列
    deque<TaskEle*> task_queue;
    deque<ExecEle*> exec_queue;
    //-条件变量
    pthread_cond_t cont;
    //-互斥锁
    pthread_mutex_t mutex;
    //-线程池大小
    int thread_count;
    //-构造函数
    ThreadPool(int thread_count):thread_count(thread_count);
    //-创建线程池
    void createPool();
    //-加入任务
    void push_task(void(*tcb)(void* arg),int i);
    //-利用析构销毁线程池
    ~ThreadPool();
};

关于数据成员:

  • task_queue、exec_queue: 任务队列和执行队列,我使用deque作为容器实现队列。

  • cont:所有线程共享的条件变量。

  • mutex:所有线程共享的互斥锁。

  • thread_count: 线程池创建的时候,初始大小

关于成员方法:

  • ThreadPool:构造函数

  • createPool:创建线程池

  • push_task: 给服务器主循环用的,给线程池添加任务。

  • ~ ThreadPool: 销毁线程池,事实上应该单独定义一个destroyPool的api,我这里为了简便合并到析构中了。

ExecEle的start函数实现

现在对于ThreadPool对象有概念以后,可以先将刚刚执行队列节点ExecEle的start函数实现,其代表了每个线程池的线程始终在跑的循环,在无任务分配的时候阻塞在某个位置。

void* ExecEle::start(void*arg){
    //-获得执行对象
    ExecEle *ee = (ExecEle*)arg;
    while(true){
        //-加锁
        pthread_mutex_lock(&(ee->pool->mutex));
        while(ee->pool->task_queue.empty()){//-如果任务队列为空,等待新任务
            if(!ee->usable){
                break;
            }
            pthread_cond_wait(&ee->pool->cont, &ee->pool->mutex);
        }
        if(!ee -> usable){
            pthread_mutex_unlock(&ee -> pool -> mutex);
            break;
        }
        TaskEle *te = ee->pool->task_queue.front();
        ee->pool->task_queue.pop_front();
        //-解锁
        pthread_mutex_unlock(&(ee->pool -> mutex));
        //-执行任务回调
        te->user_data+=to_string(pthread_self());
        te->taskCallback(te);
    }
    //-删除线程执行对象
    delete ee;
    fprintf(stdout,"destroy thread %d\n",pthread_self());
    return NULL;
}

arg参数指向的是该线程函数对应的执行元素ExecEle本身的指针,我们定义其为ee。然后进入死循环,通过ee,我们可以获得线程池中枢对象pool。

通过pool。我们可以获得任务队列的情况,当任务队列为空,则线程进入阻塞状态,等待任务队列有任务进来后,通过条件变量通知,再恢复执行。

恢复执行后,从任务队列中取出队首的任务,这个过程需要在mutex的范围内,保证独占性。

之后解除互斥锁,开始执行任务的回调。执行完进行入下个循环,尝试再次获得互斥锁。

最后说一说usable,当我们销毁线程池的时候,设置每一个线程的usable为false,那么不会立刻中断每个线程正在执行的回调,而是等回调结束后,在下一次循环中如果检测到usable为false后,就会退出整个大循环,并释放自己的锁,唤醒线程池其它休眠的线程。退出大循环后,线程自然而优雅地结束。

之后是ThreadPool自己的api实现

构造函数ThreadPool实现:

ThreadPool(int thread_count):thread_count(thread_count){
        //-初始化条件变量和互斥锁
        pthread_cond_init(&cont,NULL);
        pthread_mutex_init(&mutex,NULL);
    }

主要为了初始化cont,mutex,thread_count。

创建线程池createPool实现:

void createPool(){
        int ret;
        //-初始执行队列
        for(int i = 0;i<thread_count;++i){
            ExecEle *ee = new ExecEle;
            ee->pool = const_cast<ThreadPool*>(this);
            if(ret = pthread_create(&(ee->tid),NULL,ee->start,ee)){
                delete ee;
                ERR_EXIT_THREAD(ret,"pthread_create");
            }
            fprintf(stdout,"create thread %d\n",i);
            exec_queue.push_back(ee);
        }
        fprintf(stdout,"create pool finish...\n");
    }

通过pthread_create创建thread_count个线程,每个线程执行自己的start函数进入等待任务循环,并阻塞在锁和条件变量的位置。将exec对象push进执行队列。

添加任务 push_task实现:

void push_task(void(*tcb)(void* arg),int i){
        TaskEle *te = new TaskEle;
        te->setFunc(tcb);
        te->user_data = "Task "+to_string(i)+" run in thread ";
        //-加锁
        pthread_mutex_lock(&mutex);
        task_queue.push_back(te);
        //-通知执行队列中的一个进行任务
        pthread_cond_signal(&cont);
        //-解锁
        pthread_mutex_unlock(&mutex);
    }

主要功能是构造TaskEle对象并加入到执行队列中。

每个TaskEle可能需要执行不同的业务,所以push_task需要传入对应业务的回调tcb(task callback)

i是我加的额外参数,代表主线程中连接的客户端编号,其意义可以是socketfd。

注意在修改执行队列(push)的时候,需要加锁保证独占。

销毁线程池~ ThreadPool 实现:

~ThreadPool() {
        for(int i = 0;i<exec_queue.size();++i){
            exec_queue[i]->usable = false;
        }
        pthread_mutex_lock(&mutex);
        //-清空任务队列
        task_queue.clear();
        //-广播给每个执行线程令其退出(执行线程破开循环会free掉堆内存)
        pthread_cond_broadcast(&cont);
        pthread_mutex_unlock(&mutex);//-让其他线程拿到锁
        //-等待所有线程退出
        for(int i = 0;i<exec_queue.size(); ++i){
            pthread_join(exec_queue[i] -> tid,NULL);
        }
        //-清空执行队列
        exec_queue.clear();
        //-销毁锁和条件变量
        pthread_cond_destroy(&cont);
        pthread_mutex_destroy(&mutex);
    }

先将所有线程的usable设置为false,之后加锁,清空任务队列,并通过条件变量通知所有线程,等所有线程退出后,销毁执行队列,销毁锁和条件变量。

业务代码和服务器主循环

//-线程执行的业务函数
void execFunc(void* arg){
    TaskEle *te =(TaskEle*)arg;
    fprintf(stdout, "%s\n",te->user_data.c_str());
};

int main(){
    //-创建线程池
    ThreadPool pool(100);
    pool.createPool();
    //-创建任务
    for(int i =0;i<1000;++i){
        pool.push_task(&execFunc,i);
    }
    exit(EXIT_SUCCESS);
}

随便写的线程执行的业务,打印一下客户信息。

主线程创建100大小的线程池,并添加1000个任务(连接)。

完整代码

// *C++和posix接口实现一个线程池
//-三个组件:任务队列,执行队列,线程池(中枢管理)

#include <unistd.h>
#include <pthread.h>
#include <deque>
#include <string>
#include <iostream>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

using namespace std;

//-打印线程错误专用,根据err来识别错误信息
static inline void ERR_EXIT_THREAD(int err, const char * msg){
    fprintf(stderr,"%s:%s\n",strerror(err),msg);
    exit(EXIT_FAILURE);
}

class ThreadPool;//-声明

//- 任务队列元素
class TaskEle{
public:
    void (*taskCallback)(void *arg);
    string user_data;
    void setFunc(void (*tcb)(void *arg)){
        taskCallback = tcb;
    }
};

//-执行队列元素
class ExecEle{
public:
    pthread_t tid;
    bool usable = true;
    ThreadPool* pool;
    static void* start(void* arg);
};

//-线程池
class ThreadPool{
public:
    //-任务队列和执行队列
    deque<TaskEle*> task_queue;
    deque<ExecEle*> exec_queue;
    //-条件变量
    pthread_cond_t cont;
    //-互斥锁
    pthread_mutex_t mutex;
    //-线程池大小
    int thread_count;
    //-构造函数
    ThreadPool(int thread_count):thread_count(thread_count){
        //-初始化条件变量和互斥锁
        pthread_cond_init(&cont,NULL);
        pthread_mutex_init(&mutex,NULL);
    }
    void createPool(){
        int ret;
        //-初始执行队列
        for(int i = 0;i<thread_count;++i){
            ExecEle *ee = new ExecEle;
            ee->pool = const_cast<ThreadPool*>(this);
            if(ret = pthread_create(&(ee->tid),NULL,ee->start,ee)){
                delete ee;
                ERR_EXIT_THREAD(ret,"pthread_create");
            }
            fprintf(stdout,"create thread %d\n",i);
            exec_queue.push_back(ee);
        }
        fprintf(stdout,"create pool finish...\n");
    }
    //-加入任务
    void push_task(void(*tcb)(void* arg),int i){
        TaskEle *te = new TaskEle;
        te->setFunc(tcb);
        te->user_data = "Task "+to_string(i)+" run in thread ";
        //-加锁
        pthread_mutex_lock(&mutex);
        task_queue.push_back(te);
        //-通知执行队列中的一个进行任务
        pthread_cond_signal(&cont);
        //-解锁
        pthread_mutex_unlock(&mutex);

    }
    //-销毁线程池
    ~ThreadPool() {
        for(int i = 0;i<exec_queue.size();++i){
            exec_queue[i]->usable = false;
        }
        pthread_mutex_lock(&mutex);
        //-清空任务队列
        task_queue.clear();
        //-广播给每个执行线程令其退出(执行线程破开循环会free掉堆内存)
        pthread_cond_broadcast(&cont);
        pthread_mutex_unlock(&mutex);//-让其他线程拿到锁
        //-等待所有线程退出
        for(int i = 0;i<exec_queue.size(); ++i){
            pthread_join(exec_queue[i] -> tid,NULL);
        }
        //-清空执行队列
        exec_queue.clear();
        //-销毁锁和条件变量
        pthread_cond_destroy(&cont);
        pthread_mutex_destroy(&mutex);
    }
};

void* ExecEle::start(void*arg){
    //-获得执行对象
    ExecEle *ee = (ExecEle*)arg;
    while(true){
        //-加锁
        pthread_mutex_lock(&(ee->pool->mutex));
        while(ee->pool->task_queue.empty()){//-如果任务队列为空,等待新任务
            if(!ee->usable){
                break;
            }
            pthread_cond_wait(&ee->pool->cont, &ee->pool->mutex);
        }
        if(!ee -> usable){
            pthread_mutex_unlock(&ee -> pool -> mutex);
            break;
        }
        TaskEle *te = ee->pool->task_queue.front();
        ee->pool->task_queue.pop_front();
        //-解锁
        pthread_mutex_unlock(&(ee->pool -> mutex));
        //-执行任务回调
        te->user_data+=to_string(pthread_self());
        te->taskCallback(te);
    }
    //-删除线程执行对象
    delete ee;
    fprintf(stdout,"destroy thread %d\n",pthread_self());
    return NULL;
}


//-线程执行的业务函数
void execFunc(void* arg){
    TaskEle *te =(TaskEle*)arg;
    fprintf(stdout, "%s\n",te->user_data.c_str());
};

int main(){
    //-创建线程池
    ThreadPool pool(100);
    pool.createPool();
    //-创建任务
    for(int i =0;i<1000;++i){
        pool.push_task(&execFunc,i);
    }
    exit(EXIT_SUCCESS);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/603024.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

带有 Utopia Messenger 的免费 ChatGPT 助手

全世界都喜欢 ChatGPT&#xff01;而且这个工具从未如此易于访问。没有限制、没有VPN、没有代理和佣金。现在您可以使用 Utopia Messenger 的传奇聊天的全部功能。 ChatGPT是每个线上用户的必备工具。它为我们提供任何所需的数据&#xff0c;生成内容&#xff0c;并解决多项任…

[数据集][目标检测]目标检测数据集黄瓜数据集VOC格式1309张

数据集格式&#xff1a;Pascal VOC格式(不包含分割路径的txt文件和yolo格式的txt文件&#xff0c;仅仅包含jpg图片和对应的xml) 图片数量(jpg文件个数)&#xff1a;1308 标注数量(xml文件个数)&#xff1a;1308 标注类别数&#xff1a;1 标注类别名称:["cucumber"] 每…

图论与算法(1)图论概念

1. 图论与算法 在计算机科学中&#xff0c;图论与算法是两个重要且紧密相关的领域。图论研究图的性质和特征&#xff0c;而算法设计和分析解决问题的方法和步骤。图论提供了一种形式化的方法来描述和分析各种关系和连接&#xff0c;而算法则为解决图相关的问题提供了有效的解决…

【刷题之路】单调栈秒解每日温度

一、题目描述 原题链接&#xff1a;https://leetcode.cn/problems/daily-temperatures/ 题目描述&#xff1a; 给定一个整数数组 temperatures &#xff0c;表示每天的温度&#xff0c;返回一个数组 answer &#xff0c;其中 answer[i] 是指对于第 i 天&#xff0c;下一个更高…

【JVM】什么是双亲委派机制?

一、为什么会有这种机制&#xff1f; 类加载器将.class类加载到内存中时&#xff0c;为了避免重复加载&#xff08;确保Class对象的唯一性&#xff09;以及JVM的安全性&#xff0c;需要使用某一种方式来实现只加载一次&#xff0c;加载过就不能被修改或再次加载。 二、什么是双…

docker安装 sqlserver2017 或者 2008

一、必要条件 服务器的运行内存必须大于2GB 二、拉取2017镜像 docker pull mcr.microsoft.com/mssql/server:2017-latest三、启动镜像 docker run --name sqlserver2017 --restart always -v /home/ceshi1:/var/opt/mssql -d -e ACCEPT_EULAY -e SA_PASSWORDqwer!#123 -…

C语言参悟-运算符表达式和语句

C语言参悟-运算符表达式和语句 一、概述二、运算符0. 运算符分类1. 数学运算符2. 逻辑运算符3. 二进制运算符4. 便捷运算符 三、表达式四、语句 一、概述 在我刚开始学C语言的时候&#xff0c;语句和运算符号这些对我来说还是很陌生的。 现在来看我对于这个的理解其实就是一种…

AUTOSAR架构介绍

简介 AUTOSAR&#xff08;AUTomotive Open System ARchitecture&#xff09;是一种面向汽车电子系统的软件架构标准。AUTOSAR为汽车电子系统提供一种开放式的软件架构标准&#xff0c;以促进汽车电子系统的可重用性、互操作性和可扩展性。它包括一系列的规范和标准&#xff0c…

Force Dimension 全系列触觉反馈装置

力觉或触觉是人体感官中具有双向传递信息能力的信息载体。借助于力反馈&#xff0c;人们可以真实的按照人类的肢体语言进行人机自然互动和信息交流&#xff0c;用户通过应用力反馈&#xff0c;可以获得和触摸实际物体时相同的运动感&#xff0c;从而产生更真实的沉浸感。 力反馈…

毕业遭失业,前途一片黑暗...不得已转行软件测试,太多心酸和无助...

大家好&#xff0c;我叫小涵&#xff0c;一名应届毕业生&#xff0c;目前已经成功转行互联网。写这篇文章的目的是因为很多人不喜欢自己的现状&#xff0c;想通过学习改变&#xff0c;奈何没有出路&#xff0c;所以想为这部分人提供一些思路&#xff1b;其次文章会总结我自己转…

汽车电子设计之SBC芯片

参考英飞凌SBC官网资料&#xff1a;https://www.infineon.com/cms/cn/product/automotive-system-ic/system-basis-chips-sbc/ SBC芯片在汽车电子领域可谓占一席之地了。那么什么是SBC&#xff1f;怎么用&#xff1f;用在哪里&#xff1f;主要特性&#xff1f; 目录 1.什么是…

unity Sockets通信 使用UDP协议,设置客户端电脑网络配置,使用新线程获取数据,解决卡顿问题,

今天调试和服务器连接&#xff0c;发现始终获取不到服务器的数据&#xff0c; 电脑和服务器都在同一局域网&#xff0c;仍然获取不到&#xff0c; 下面是电脑环境配置&#xff0c; 第一步&#xff1a; 设置网络为专用网络&#xff0c;然后点击配置防火墙和安全设置&#xff0c;…

Unity——2D小游戏笔记整理

【每日一句&#xff1a;清晨和夜晚都请用尽全力去生活】 目录 一、环境搭建 二、人物 三、相机跟随人物移动 四、平铺精灵 五、血条跟随敌人行走 六、脚本逻辑 【玩家行走方法】 【玩家跳跃方法】 【改变玩家血量值方法】 【创建玩家子弹方法】 【主角血量&#xff…

《计算机网络——自顶向下方法》精炼——3.7(1)

少而好学,如日出之阳;壮而好学,如日中之光;志而好学,如炳烛之光。——刘向 文章目录 拥塞控制方法ATM ABR拥塞控制 TCP拥塞控制TCP拥塞控制算法的实现慢启动拥塞避免快速恢复总结 拥塞控制方法 在上一篇文章中&#xff0c;我们介绍了在数据传输过程中出现的问题。本节将简要介…

Bitbucket 的SSH keys 突然无法识别 -> Permission denied (publickey)问题

Bitbucket 的SSH keys 突然无法识别 -> Permission denied (publickey)问题 用了几年的SSH keys突然出现Permission denied (publickey)问题&#xff0c;如下图所示&#xff1a; 1、首先排查项目权限问题&#xff0c;确认其他账号可更新代码&#xff0c;排除 2、排除SSH k…

【软件测试】白盒测试与黑盒测试

白盒测试与黑盒测试 测试用例定义生成的基本准测设计步骤作用测试数据和测试用例的区别 黑盒测试定义优点缺点黑盒测试的实施过程等价类划分法边界值分析法错误推测法因果图判定表判定表例题 白盒测试介绍覆盖程度基本路径覆盖程序流程图简化成控制流图计算圈复杂度导出测试用例…

chatGPT 学习笔记

学习笔记&#xff1a;chatGPT chatGPT 概述 什么是 chatGPT &#xff1f;(要说明定义、来源、功能和特点) ChatGPT 是 OpenAI 开发的一个大型预训练语言模型&#xff0c;它基于 GPT-3.5 模型&#xff0c;可以在对话中生成类似人类的文本响应&#xff0c;简称对齐。它使用自监…

基于节点分层的配网潮流前推回代方法【IEEE33节点】(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

java的UDP(一)

文章目录 1. 简介2. UDP客户端3. UDP服务器4. DatagramPacket类 1. 简介 Java中的UDP实现分为两个类&#xff1a;DatagramPacket和DatagramSocket。DatagramPacket类将数据字节填充到UDP包汇总&#xff0c;这称为数据报&#xff0c;由你来解包接收的数据报。DatagramSocket可以…

2023北京大学首席营销官CMO高级研修班简介

【课程背景】自 2022 年以来&#xff0c;营销新趋势呈现以下变化&#xff1a;消费渠道多元化&#xff0c;社会化媒体 成为信息主要渠道&#xff0c;流量红利消失&#xff0c;数字营销成当下主流&#xff1a;元宇宙数字营销成热点&#xff1b;私域运营更受品牌重视&#xff1b;国…