一文搞懂系列——Linux C线程池技术

news2024/12/23 13:32:37

背景

最近在走读诊断项目代码时,发现其用到了线程池技术,感觉耳目一新。以前基本只是听过线程池,但是并没有实际应用。对它有一丝的好奇,于是趁这个机会深入了解一下线程池的实现原理。

线程池的优点

线程池出现的背景,其实对应CPU性能优化——“瑞士军刀“文章中提到的短时应用。

即短时间内通过创建线程处理大量请求,但是请求业务的执行时间过短,会造成一些缺陷。

  • 浪费系统资源。比如我们创建一个线程,再销毁一个线程耗时10ms,但是业务的执行时间只有10ms。这就导致系统有效利用率较低。
  • 系统不稳定。如果短时间内,来了大量的请求,每一个请求都通过创建线程的方式执行。可能存在瞬时负载很高,请求响应降低,从而导致系统不稳定。

于是我们可以通过线程池技术,减少线程创建和消耗的耗时,提高系统的资源利用;控制线程并行数量,确保系统的稳定性;

线程池实现

线程池的核心包括以下内容:

  • 线程池任务节点结构。
  • 线程池控制器。
  • 线程池的控制流程。

线程池任务节点结构

线程池任务结点用来保存用户投递过来的的任务,并放入线程池中的线程来执行,任务结构如下:

struct worker_t {
    void * (* process)(void * arg); /*回调函数*/
    int    paratype;                /*函数类型(预留)*/
    void * arg;                     /*回调函数参数*/
    struct worker_t * next;         /*链接下一个任务节点*/
};

线程池控制器

线程池控制器用来对线程池进行控制管理,描述当前线程池的最基本信息,包括任务的投递,线程池状态的更新与查询,线程池的销毁等,其结构如下:

/*线程控制器*/
struct CThread_pool_t {
    pthread_mutex_t queue_lock;     /*互斥锁*/
    pthread_cond_t  queue_ready;    /*条件变量*/
    
    worker_t * queue_head;          /*任务节点链表 保存所有投递的任务*/
    int shutdown;                   /*线程池销毁标志 1-销毁*/
    pthread_t * threadid;           /*线程ID*/
    
    int max_thread_num;             /*线程池可容纳最大线程数*/
    int current_pthread_num;        /*当前线程池存放的线程*/
    int current_pthread_task_num;   /*当前已经执行任务和已分配任务的线程数目和*/
    int current_wait_queue_num;     /*当前等待队列的的任务数目*/
    int free_pthread_num;           /*线程池允许最大的空闲线程数/*/
    
    /**
     *  function:       ThreadPoolAddWorkUnlimit
     *  description:    向线程池投递任务
     *  input param:    pthis   线程池指针
     *                  process 回调函数
     *                  arg     回调函数参数
     *  return Valr:    0       成功
     *                  -1      失败
     */     
    int (* AddWorkUnlimit)(void * pthis, void * (* process)(void * arg), void * arg);
    
    /**
     *  function:       ThreadPoolAddWorkLimit
     *  description:    向线程池投递任务,无空闲线程则阻塞
     *  input param:    pthis   线程池指针
     *                  process 回调函数
     *                  arg     回调函数参数
     *  return Val:     0       成功
     *                  -1      失败
     */     
    int (* AddWorkLimit)(void * pthis, void * (* process)(void * arg), void * arg);
    
    /**
     *  function:       ThreadPoolGetThreadMaxNum
     *  description:    获取线程池可容纳的最大线程数
     *  input param:    pthis   线程池指针
     */     
    int (* GetThreadMaxNum)(void * pthis);
    
    /**
     *  function:       ThreadPoolGetCurrentThreadNum
     *  description:    获取线程池存放的线程数
     *  input param:    pthis   线程池指针
     *  return Val:     线程池存放的线程数
     */     
    int (* GetCurrentThreadNum)(void * pthis);
    
    /**
     *  function:       ThreadPoolGetCurrentTaskThreadNum
     *  description:    获取当前正在执行任务和已经分配任务的线程数目和
     *  input param:    pthis   线程池指针
     *  return Val:     当前正在执行任务和已经分配任务的线程数目和
     */     
    int (* GetCurrentTaskThreadNum)(void * pthis);
    
    /**
     *  function:       ThreadPoolGetCurrentWaitTaskNum
     *  description:    获取线程池等待队列任务数
     *  input param:    pthis   线程池指针
     *  return Val:     等待队列任务数
     */     
    int (* GetCurrentWaitTaskNum)(void * pthis);
    
    /**
     *  function:       ThreadPoolDestroy
     *  description:    销毁线程池
     *  input param:    pthis   线程池指针
     *  return Val:     0       成功
     *                  -1      失败
     */     
    int (* Destroy)(void * pthis);    
};

线程池的控制流程

线程池的控制流程可以分为三个步骤:

  1. 线程池创建。即创建max_num个线程ThreadPoolRoutine,即空闲线程:
/**
 *  function:       ThreadPoolConstruct
 *  description:    构建线程池
 *  input param:    max_num   线程池可容纳的最大线程数
 *                  free_num  线程池允许存在的最大空闲线程,超过则将线程释放回操作系统
 *  return Val:     线程池指针                 
 */     
CThread_pool_t * 
ThreadPoolConstruct(int max_num, int free_num)
{
    int i = 0;
    
    CThread_pool_t * pool = (CThread_pool_t *)malloc(sizeof(CThread_pool_t));
    if(NULL == pool)
        return NULL;
    
    memset(pool, 0, sizeof(CThread_pool_t));
    
    /*初始化互斥锁*/
    pthread_mutex_init(&(pool->queue_lock), NULL);
    /*初始化条件变量*/
    pthread_cond_init(&(pool->queue_ready), NULL);
    
    pool->queue_head                = NULL;
    pool->max_thread_num            = max_num; // 线程池可容纳的最大线程数
    pool->current_wait_queue_num    = 0;
    pool->current_pthread_task_num  = 0;
    pool->shutdown                  = 0;
    pool->current_pthread_num       = 0;
    pool->free_pthread_num          = free_num; // 线程池允许存在最大空闲线程
    pool->threadid                  = NULL;
    pool->threadid                  = (pthread_t *)malloc(max_num*sizeof(pthread_t));
    /*该函数指针赋值*/
    pool->AddWorkUnlimit            = ThreadPoolAddWorkUnlimit;
    pool->AddWorkLimit              = ThreadPoolAddWorkLimit;
    pool->Destroy                   = ThreadPoolDestroy;
    pool->GetThreadMaxNum           = ThreadPoolGetThreadMaxNum;
    pool->GetCurrentThreadNum       = ThreadPoolGetCurrentThreadNum;
    pool->GetCurrentTaskThreadNum   = ThreadPoolGetCurrentTaskThreadNum;
    pool->GetCurrentWaitTaskNum     = ThreadPoolGetCurrentWaitTaskNum;
    
    for(i=0; i<max_num; i++) {
        pool->current_pthread_num++;    // 当前池中的线程数
        /*创建线程*/
        pthread_create(&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void *)pool);
        usleep(1000);        
    }
    
    return pool;
}
  1. 投递任务。即将任务生产者,将任务节点投入线程池中。实现如下:
/**
 *  function:       ThreadPoolAddWorkLimit
 *  description:    向线程池投递任务,无空闲线程则阻塞
 *  input param:    pthis   线程池指针
 *                  process 回调函数
 *                  arg     回调函数参数
 *  return Val:     0       成功
 *                  -1      失败
 */     
int
ThreadPoolAddWorkLimit(void * pthis, void * (* process)(void * arg), void * arg)
{ 
    // int FreeThreadNum = 0;
    // int CurrentPthreadNum = 0;
    
    CThread_pool_t * pool = (CThread_pool_t *)pthis;
    
    /*为添加的任务队列节点分配内存*/
    worker_t * newworker  = (worker_t *)malloc(sizeof(worker_t)); 
    if(NULL == newworker) 
        return -1;
    
    newworker->process  = process;  // 回调函数,在线程ThreadPoolRoutine()中执行
    newworker->arg      = arg;      // 回调函数参数
    newworker->next     = NULL;      
    
    pthread_mutex_lock(&(pool->queue_lock));
    
    /*插入新任务队列节点*/
    worker_t * member = pool->queue_head;   // 指向任务队列链表整体
    if(member != NULL) {
        while(member->next != NULL) // 队列中有节点
            member = member->next;  // member指针往后移动
            
        member->next = newworker;   // 插入到队列链表尾部
    } else 
        pool->queue_head = newworker; // 插入到队列链表头
    
    assert(pool->queue_head != NULL);
    pool->current_wait_queue_num++; // 等待队列加1
    
    /*空闲的线程= 当前线程池存放的线程 - 当前已经执行任务和已分配任务的线程数目和*/
    int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num;
    /*如果没有空闲线程且池中当前线程数不超过可容纳最大线程*/
    if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num)) {  //-> 条件为真进行新线程创建
        int CurrentPthreadNum = pool->current_pthread_num;
        
        /*新增线程*/
        pool->threadid = (pthread_t *)realloc(pool->threadid, 
                                        (CurrentPthreadNum+1) * sizeof(pthread_t));
                                        
        pthread_create(&(pool->threadid[CurrentPthreadNum]),
                                              NULL, ThreadPoolRoutine, (void *)pool);
        /*当前线程池中线程总数加1*/                                   
        pool->current_pthread_num++;
        
        /*分配任务线程数加1*/
        pool->current_pthread_task_num++;
        pthread_mutex_unlock(&(pool->queue_lock));
        
        /*发送信号给一个处与条件阻塞等待状态的线程*/
        pthread_cond_signal(&(pool->queue_ready));
        return 0;
    }
    
    pool->current_pthread_task_num++;
    pthread_mutex_unlock(&(pool->queue_lock));
    
    /*发送信号给一个处与条件阻塞等待状态的线程*/
    pthread_cond_signal(&(pool->queue_ready));
//  usleep(10);  //看情况  
    return 0;
}
  1. 线程执行。即每一个线程的执行逻辑。实现如下:
/**
 *  function:       ThreadPoolRoutine
 *  description:    线程池中执行的线程
 *  input param:    arg  线程池指针
 */     
void * 
ThreadPoolRoutine(void * arg)
{
    CThread_pool_t * pool = (CThread_pool_t *)arg;
    
    while(1) {
        /*上锁,pthread_cond_wait()调用会解锁*/
        pthread_mutex_lock(&(pool->queue_lock));
        
        /*队列没有等待任务*/
        while((pool->current_wait_queue_num == 0) && (!pool->shutdown)) {
            /*条件锁阻塞等待条件信号*/
            pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
        }
        
        if(pool->shutdown) {
            pthread_mutex_unlock(&(pool->queue_lock));
            pthread_exit(NULL);         // 释放线程
        }
        
        assert(pool->current_wait_queue_num != 0);
        assert(pool->queue_head != NULL);
        
        pool->current_wait_queue_num--; // 等待任务减1,准备执行任务
        worker_t * worker = pool->queue_head;   // 去等待队列任务节点头
        pool->queue_head = worker->next;        // 链表后移     
        pthread_mutex_unlock(&(pool->queue_lock));
        
        (* (worker->process))(worker->arg);      // 执行回调函数
        
        pthread_mutex_lock(&(pool->queue_lock));
        pool->current_pthread_task_num--;       // 函数执行结束
        free(worker);   // 释放任务结点
        worker = NULL;
        
        if((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num) {
            pthread_mutex_unlock(&(pool->queue_lock));
            break;  // 当线程池中空闲线程超过 free_pthread_num 则将线程释放回操作系统
        }
        pthread_mutex_unlock(&(pool->queue_lock));    
    }
    
    pool->current_pthread_num--;    // 当前线程数减1
    pthread_exit(NULL);             // 释放线程
    
    return (void *)NULL;
}

这个就是用来执行任务的线程,在初始化创建线程时所有线程都全部阻塞在pthread_cond_wait()处,此时的线程就为空闲线程,也就是线程被挂起,当收到信号并取得互斥锁时,表明任务投递过来
则获取等待队列里的任务结点并执行回调函数;函数执行结束后回去判断当前等待队列是否还有任务,有则接下去执行,否则重新阻塞回到空闲线程状态。

若我的内容对您有所帮助,还请关注我的公众号。不定期分享干活,剖析案例,也可以一起讨论分享。
我的宗旨:
踩完您工作中的所有坑并分享给您,让你的工作无bug,人生尽是坦途

在这里插入图片描述

总结

实际上,我觉得在诊断项目中,线程池技术是非必要的。因此它不会涉及到大量的请求,以及每一个请求处理,一般都会比较耗时。

参考:https://www.cnblogs.com/zhaoosheLBJ/p/9337291.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1387302.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Lede(OpenWrt)安装和双宽带叠加

文章目录 一、Lede介绍1. 简介2. 相关网站 二、Lede安装1. 编译环境2. SHELL编译步骤3. 腾讯云自动化助手 三、Lede配置1. 电信接口配置2. 联通接口配置3. 多线多播配置4. 网速测试效果 一、Lede介绍 1. 简介 LEDE是一个专为路由器和嵌入式设备设计的自由和开源的操作系统。 …

HTML--JavaScript--引入方式

啊哈~~~基础三剑看到第三剑&#xff0c;JavaScript HTML用于控制网页结构 CSS用于控制网页的外观 JavaScript用于控制网页的行为 JavaScript引入方式 引入的三种方式&#xff1a; 外部JavaScript 内部JavaScript 元素事件JavaScript 引入外部JavaScript 一般情况下网页最好…

【动态规划】19子数组系列_最大子数组和_C++(medium)

题目链接&#xff1a;leetcode最大子数组和 目录 题目解析&#xff1a; 算法原理 1.状态表示 2.状态转移方程 3.初始化 4.填表顺序 5.返回值 编写代码 题目解析&#xff1a; 题目让我们找出一个具有最大和的连续子数组&#xff0c;返回其最大和。 算法原理: 1.状态表示…

城市信息模型平台顶层设计与实践-CIM-读书笔记

城市信息模型平台顶层设计与实践-CIM-读书笔记 1、地理空间框架 GB/T 30317—2013《地理空间框架基本规定》规定地理空间框架为&#xff1a;“地理信息数据及其采集、加工、交换、服务所涉及的政策、法规、标准、技术、设施、机制和人力资源的总称&#xff0c;由基础地理信息…

显示文件前后内容 (来get小命令 tr 、 cut……)

一、 命令 tr -------基本功能转换 tr&#xff1a;对文件字符进行处理 格式 tr [选项]... SET1 [SET2] SET 是一组字符串&#xff0c;一般都可按照字面含义理解 -d 删除 -s 压缩 -c 用字符串1中字符集的补集替换此字符集&#xff0c;要求字符集为ASCII。 tr 1 a 出现…

【LeetCode】206. 反转链表(简单)——代码随想录算法训练营Day03

题目链接&#xff1a;206. 反转链表 题目描述 给你单链表的头节点 head &#xff0c;请你反转链表&#xff0c;并返回反转后的链表。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5] 输出&#xff1a;[5,4,3,2,1] 示例 2&#xff1a; 输入&#xff1a;head [1,2] 输…

今天吃什么小游戏(基于Flask框架搭建的简单应用程序,用于随机选择午餐选项。代码分为两部分:Python部分和HTML模板部分)

今天吃什么 一个简单有趣的外卖点饭网站&#xff0c;不知道吃什么的时候&#xff0c;都可以用它自动决定你要吃的&#xff0c;包括各种烧烤、火锅、螺蛳粉、刀削面、小笼包、麦当劳等午餐全部都在内。点击开始它会随意调出不同的午餐&#xff0c;点击停止就会挑选一个你准备要吃…

数据结构学习 jz53_1 在排序数组中查找数字1 0 ~ n - 1 中缺失的数字

关键词&#xff1a;查找算法 二分法 映射 位运算 题目一&#xff1a;统计目标成绩的出现次数 方法一&#xff1a;我自己写的。[ 用时: 13 m 3 s ] 二分法线性扫描 方法二&#xff1a;看了题解 方法一&#xff1a; 二分法线性查找 思路&#xff1a; 先二分查找找到和targe…

如何购买腾讯云服务器?图文教程超详细

腾讯云服务器购买流程很简单&#xff0c;有两种购买方式&#xff0c;直接在官方活动上购买比较划算&#xff0c;在云服务器CVM或轻量应用服务器页面自定义购买价格比较贵&#xff0c;但是自定义购买云服务器CPU内存带宽配置选择范围广&#xff0c;活动上购买只能选择固定的活动…

在windows11系统上利用docker搭建ubuntu记录

我的windows11系统上&#xff0c;之前已经安装好了window版本的docker&#xff0c;没有安装的小伙伴需要去安装一下。 下面直接记录安装linux的步骤&#xff1a; 一、创建linux容器 1、拉取镜像 docker pull ubuntu 2、查看镜像 docker images 3、创建容器 docker run --…

Jmeter 测试脚本录制器-HTTP 代理服务器

Jmeter 测试脚本录制器-HTTP 代理服务器 Jmeter 配置代理服务器代理服务器获取请求地址示例图配置步骤 浏览器配置代理Google 浏览器插件配置代理windows 本地网络配置代理 启动录制&#xff0c;生成证书生成证书导入证书Jmeter 配置证书 浏览器点击页面&#xff0c;录制请求地…

Zabbix的多场景应用

1 zabbix更多用法 1.1 自动注册方式 zabbix自动发现 zabbix server服务端主动发现zappix agent客户端 1&#xff09;在【配置】-【自动发现】创建 发现规则&#xff0c;设置 IP范围 检查的键值system.uname 2&#xff09;在【配置】-【动作】-【发现动作】创建 动作&#x…

RabbitMQ详解(值得珍藏)

1. 基本概念 RabbitMQ是一款开源&#xff0c;使用Erlang编写的&#xff0c;基于AMQP协议的消息中间件&#xff1b; 提到RabbitMQ&#xff0c;就不得不提AMQP协议。AMQP协议是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议&#xff0c;是应用…

基于springboot书籍学习平台源码和论文

首先,论文一开始便是清楚的论述了平台的研究内容。其次,剖析平台需求分析,弄明白“做什么”,分析包括业务分析和业务流程的分析以及用例分析,更进一步明确平台的需求。然后在明白了平台的需求基础上需要进一步地设计平台,主要包罗软件架构模式、整体功能模块、数据库设计。本项…

2024年腾讯云服务器配置价格表(机型/磁盘/宽带/CPU)

腾讯云服务器租用价格表&#xff1a;轻量应用服务器2核2G3M价格62元一年、2核2G4M价格118元一年&#xff0c;540元三年、2核4G5M带宽218元一年&#xff0c;2核4G5M带宽756元三年、轻量4核8G12M服务器446元一年、646元15个月&#xff0c;云服务器CVM S5实例2核2G配置280.8元一年…

我的NPI项目之设备系统启动(三) -- CDT的一个实例

上面说了这么多&#xff0c;这里就添加一个CDT的使用实例和简单的代码解析。 首先生成cdt_configure.xml配置文件&#xff0c;然后执行如下命令&#xff1a; python cdt_generator.py cdt_configure.xml CDT.bin; 就可以生成对应的CDT.bin文件。同时也会生成, 我们会利用ha…

【新】Unity Meta Quest MR 开发(一):Passthrough 透视配置

文章目录 &#x1f4d5;教程说明&#x1f4d5;配置透视的串流调试功能&#x1f4d5;第一步&#xff1a;设置 OVRManager&#x1f4d5;第二步&#xff1a;添加 OVRPassthroughLayer 脚本&#x1f4d5;第三步&#xff1a;在场景中添加虚拟物体&#x1f4d5;第四步&#xff1a;设置…

Nginx——强化基础配置

1、牢记Context Context是Nginx中每条指令都会附带的信息&#xff0c;用来说明指令在哪个指令块中使用&#xff0c;可以将Context 理解为配置环境。 每个指令都拥有自己的配置环境&#xff0c;如果把配置环境记错了&#xff0c;或者在设计时未考虑配置环境的作用&#xff0c;…

Android基于Matrix绘制PaintDrawable设置BitmapShader,以手指触点为中心显示原图的圆切图,Kotlin(4)

Android基于Matrix绘制PaintDrawable设置BitmapShader&#xff0c;以手指触点为中心显示原图的圆切图&#xff0c;Kotlin&#xff08;4&#xff09; 这篇 Android基于Matrix绘制PaintDrawable设置BitmapShader&#xff0c;以手指触点为中心显示原图像圆图&#xff0c;Kotlin&am…

使用ChatGPT对进行论文改写与润色

一、内容改写 关键在于明确改写的具体要求。 例如:[论文内容] 可以指明需要提升该段落的流畅性和逻辑连贯性。 常用指令 细微调整文本 轻微编辑 重写以增强表述清晰度 简化句式 校正语法和拼写错误 提升文本的流畅性和条理性 优化词汇使用 调整文本风格 进行深度编辑…