一个简单实用的线程池及线程池组的实现!

news2024/12/22 18:49:13

1.线程池简介

线程池,顾名思义,就是一个“池子”里面放有多个线程。为什么要使用线程池呢?当我们编写的代码需要并发异步处理很多任务时候,一般的处理办法是一个任务开启一个线程去处理,处理结束后释放线程。可是这样频繁的申请释放线程,系统的开销很大,为了解决这个问题,线程池就产生了。线程池实现原理是事先申请一定数量的线程存放在程序中,当外部有任务需要线程处理时,把这个任务放到这个“池子”里面,“池子”里面空闲的线程就去取这个任务进行处理,这样既能实现多线程并发处理任务,又能减少系统频繁创建删除线程的开销,这种技术叫做池化技术,相应的池化技术还有内存池、连接池等。

为了更形象理解线程池,举一个例子:线程池就像鸡圈里面鸡,每一个鸡比作一个线程,你手里有一把玉米,每一个颗玉米比作一个任务,鸡吃玉米比作处理任务。当你把一把玉米撒入鸡圈,一群鸡就围过来抢玉米,但是一次一只鸡只能吃一颗玉米,吃完一颗继续吃下一颗,直到鸡圈里面的玉米吃完才停止。线程池处理任务也是一样的,当任务链表上有任务时,通过条件变量通知线程池里面的线程,一群线程就围过来了,但是一个线程一次只能取一个任务进行处理,处理完又去取下一个任务,池子里面每一个线程都是这样处理,直到链表上面的任务全部处理完了,池子中的线程又空闲起来了。

2.线程池-设计实现

实现思路:通过向系统申请多个线程,创建一个任务链表,链表上面存放待处理的任务,当有新的任务加入时候,通过条件变量通知池子里面的线程从链表上面取任务,然后处理任务,一直这样循环下去。

​首先定义结构体,定义如下:

/**
 * 定义的回调函数
*/
typedef void (*task_func_t)(void *args);

/**
 * 定义的任务节点结构体
*/
typedef struct task_t
{
    void             *args; //任务参数
    task_func_t      func;  //任务函数指针
    struct list_head node;  //链表节点
}task_t;

/**
 * 线程池信息
*/
typedef struct threadpool_t
{
    struct list_head hlist;       //任务链表
    int              thread_num;  //线程池数量
    int              max_ts_num;  //最大任务数量
    volatile int     curr_ts_num; //当前线程池存在的任务数
    volatile int     is_exit;     //是否退出线程池标志
    pthread_mutex_t  mutex;       //互斥锁
    pthread_cond_t   cond;        //条件变量
    pthread_t        *ths;        //线程id数组
}threadpool_t;

这是线程池实现的程序:

/**
 * @brief:线程处理任务函数
 * @args: 传入的参数
 * @return: NULL
*/
static void* _process_task_thread(void *args)
{
    threadpool_t* tp = (threadpool_t*)args;
    struct list_head *pos = NULL;
    task_t *task=NULL;

    if(!args) return NULL;

    while(1)
    {
        pthread_mutex_lock(&tp->mutex);
        while(list_empty(&tp->hlist) && !tp->is_exit){
            pthread_cond_wait(&tp->cond, &tp->mutex);
        }
        if(tp->is_exit){        //判断释放退出线程池
            pthread_mutex_unlock(&tp->mutex);
            break;
        }
        pos = tp->hlist.next;   //从任务链表取出头节点
        list_del(pos);          //从链表中删除节点
        --tp->curr_ts_num;      //更新任务数
        pthread_mutex_unlock(&tp->mutex);

        task = list_entry(pos, task_t, node); //从链表节点推出任务节点
        task->func(task->args); //执行任务
        free(task);             //释放任务内存
    }

    return NULL;
}

/**
 * @brief:创建一个线程池
 * @thread_nums: 线程数量
 * @max_ts_num:线程池中最大的任务数量
 * @return: 线程池句柄
*/
threadpool_t* create_threadpool(int thread_nums, int max_ts_num)
{
    if(thread_nums <= 0) return NULL;

    threadpool_t* tp = (threadpool_t*)malloc(sizeof(threadpool_t));
    memset(tp, 0, sizeof(threadpool_t));
    INIT_LIST_HEAD(&tp->hlist);

    tp->is_exit = 0;
    tp->curr_ts_num = 0;
    tp->thread_num = thread_nums;
    tp->max_ts_num = max_ts_num;

    tp->ths = (pthread_t*)malloc(sizeof(pthread_t)*thread_nums);
    pthread_mutex_init(&tp->mutex, NULL);
    pthread_cond_init(&tp->cond, NULL);

    for(int i=0; i<tp->thread_num; ++i){
        pthread_create(&(tp->ths[i]), NULL, _process_task_thread, tp);
    }

    return tp;
}   

/**
 * @brief:往线程池中添加任务
 * @tp: 线程池句柄
 * @func:任务处理函数指针
 * @args:传入任务参数
 * @priority: 优先级 1:优先处理 其他:添加到尾部
 * @return: 返回状态 0:ok
*/
int add_task_threadpool(threadpool_t* tp, task_func_t func, void *args, int priority)
{
    if(!tp) return -1;
    if(!func) return -2;
    if(tp->curr_ts_num > tp->max_ts_num) return -3;

    task_t *task = (task_t*)malloc(sizeof(task_t)); //申请任务节点内存
    task->func = func; //给函数指针赋值
    task->args = args; //保持参数指针

    pthread_mutex_lock(&tp->mutex);
    if(priority==1)    //高优先级,添加到头部
        list_add(&task->node, &tp->hlist);
    else               //添加到尾部
        list_add_tail(&task->node, &tp->hlist);
    ++tp->curr_ts_num; //更新任务数
    pthread_mutex_unlock(&tp->mutex);

    pthread_cond_signal(&tp->cond); //通知线程取任务

    return 0;
}

/**
 * @brief:获取线程池中当前存在的任务数量
 * @tp: 线程池句柄
 * @return: 当前任务数量
*/
int get_ts_num_threadpool(threadpool_t* tp)
{
    return tp ? tp->curr_ts_num : -1;
}

/**
 * @brief:释放线程池资源
 * @tp:线程池句柄
 * @return: 0:ok
*/
int destory_threadpool(threadpool_t* tp)
{
    if(!tp) return -1;
    while(!list_empty(&tp->hlist)){  //等待线程池执行完链表中的任务
        continue;   
    }
    tp->is_exit = 1;                  //更新标志,退出线程池
    pthread_cond_broadcast(&tp->cond);//通知所有线程函数

    for(int i=0; i<tp->thread_num; ++i){//等待所有线程函数结束
        pthread_join(tp->ths[i], NULL);
    }

    pthread_mutex_destroy(&tp->mutex); //释放资源
    pthread_cond_destroy(&tp->cond);
    
    free(tp->ths);

    free(tp);
    tp = NULL;

    return 0;
}

相关视频推荐

手把手实现线程池(120行),实现异步操作,解决项目性能问题 

手撕高性能线程池,准备好linux开发环境

线程池在3个开源框架的应用(redis、skynet、workflow)

免费学习地址:c/c++ linux服务器开发/后台架构师

需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

3.线程池组-设计实现

有了线程池处理并发任务,为什么还要线程池组呢?原因在于线程池中,所有的线程都使用一个互斥锁阻塞,当你创建的线程池中线程的个数比较多的情况下,存在很多线程对同一个线程抢占,这样会影响线程池取任务处理的效率。因此由"小颗粒"(即每一个线程池中线程个数少)的线程池组成一个"线程池组"这样就能减轻多个线程对同一个锁抢占造成效率低的问题。

设计实现:将多个线程池封装组合到一起,当外部有任务需要处理时,找到线程池组中线程池任务最少的池子,把任务给放进去。

​定义一个线程池组管理结构体:

typedef struct manange_thpool_t
{
    int thpool_nums;                            //线程池个数
    threadpool_t *thpools[MAX_THREADPOOL_NUMS]; //线程池结构体
}manange_thpool_t;

代码实现:

/**
 * @brief:创建线程池组管理句柄
 * @tp_nums:线程池组中线程池个数
 * @thread_num:单个线程池中线程个数
 * @max_ts_n:单个线程池中最大的任务数量
*/
manange_thpool_t* create_group_threadpool(int tp_nums, int thread_num, int max_ts_n)
{
    manange_thpool_t* mtp = (manange_thpool_t*)malloc(sizeof(manange_thpool_t));
    if(!mtp) return NULL;
    memset(mtp, 0, sizeof(manange_thpool_t));
    mtp->thpool_nums = tp_nums;

    for(int i=0; i<tp_nums; ++i){
        mtp->thpools[i] = create_threadpool(thread_num, max_ts_n);
    }

    return mtp;
}

/**
 * @brief:往线程池组中添加任务
 * @mtp:线程池组句柄
 * @func:任务函数
 * @args:任务函数的参数
 * @priority: 优先级 1:优先处理 其他:依次处理
 * @return: 0:ok 其他:err
*/
int add_task_group_threadpool(manange_thpool_t* mtp, task_func_t func, void *args, int priority)\
{
    int ts_num= INT_MAX;
    threadpool_t *tp=NULL;
    int index=0;

    for(register int i=0; i<mtp->thpool_nums; ++i){
        if(mtp->thpools[i]->curr_ts_num < ts_num){
            ts_num = mtp->thpools[i]->curr_ts_num;
            tp = mtp->thpools[i];
            index=i;
        }
    }

    if(!tp){
        tp = mtp->thpools[0];
    }
    return add_task_threadpool(tp, func, args, priority);
}

/**
 * @brief:释放线程池组函数
 * @tp: 线程池组句柄
 * @return:none
*/
void destory_group_threadpool(manange_thpool_t* tp)
{
    if(!tp) return;
    
    for(int i=0; i<tp->thpool_nums; ++i){
        if(tp->thpools[i]) destory_threadpool(tp->thpools[i]);
    }
}

4.测试

测试程序如下:

#include <stdio.h>
#include <unistd.h>
#include "list.h"
#include "threadpool.h"
#include "manange_threadpool.h"

//任务传递的参数
typedef struct info_t
{
    int times;
    char buffer[32];
}info_t;


void task1(void *args)
{
    info_t *info = (info_t*)args;
    printf("handle task1 pid=%lld times=%d buffer=%s\n", pthread_self(), info->times, info->buffer);
    free(args);
}

void task2(void *args)
{
    info_t *info = (info_t*)args;
    printf("handle task2 pid=%lld times=%d buffer=%s\n", pthread_self(), info->times, info->buffer);
    free(args);
}

void task3(void *args)
{
    info_t *info = (info_t*)args;
    printf("handle task3 pid=%lld times=%d buffer=%s\n", pthread_self(), info->times, info->buffer);
    free(args);
}

//------------split-----------------

void test_threadpool(void)
{
    threadpool_t* tp = create_threadpool(4, 128);

    info_t *info;

    for(int t=0; t<10; ++t){
        for(int i=0; i<32; ++i){
            info = (info_t *)malloc(sizeof(info_t));
            info->times=i;
            sprintf(info->buffer, "Test ThreadPool task1 info...");
            add_task_threadpool(tp, task1, info, 1); //往线程池组添加任务

            info = (info_t *)malloc(sizeof(info_t));
            info->times=i;
            sprintf(info->buffer, "Test ThreadPool task2 info...");
            add_task_threadpool(tp, task2, info, 0);

            info = (info_t *)malloc(sizeof(info_t));
            info->times=i;
            sprintf(info->buffer, "Test ThreadPool task3 info...");
            add_task_threadpool(tp, task3, info, 0);
        }
        sleep(1);
    }

    destory_threadpool(tp);
    printf("Test ThreadPool Finish...\n");
}

void test_manange_threadpool(void)
{
    //创建线程池组句柄,有4个线程池,每个线程池使用4线程,每个线程池最大的任务数是32
    manange_thpool_t* mtp = create_group_threadpool(4, 4, 128);
    info_t *info;

    for(int t=0; t<10; ++t){
        for(int i=0; i<32; ++i){
            info = (info_t *)malloc(sizeof(info_t));
            info->times=i;
            sprintf(info->buffer, "Test task1 info...");
            add_task_group_threadpool(mtp, task1, info, 1); //往线程池组添加任务

            info = (info_t *)malloc(sizeof(info_t));
            info->times=i;
            sprintf(info->buffer, "Test task2 info...");
            add_task_group_threadpool(mtp, task2, info, 0);

            info = (info_t *)malloc(sizeof(info_t));
            info->times=i;
            sprintf(info->buffer, "Test task3 info...");
            add_task_group_threadpool(mtp, task3, info, 0);
        }
        sleep(1);
    }

    //释放线程池组资源
    destory_group_threadpool(mtp);
    printf("Test Manage ThreadPool Finish...\n");
}

int main(void)
{

#if 1 //测试单个的线程池功能
    test_threadpool();
#else //测试线程池组功能
    test_manange_threadpool();
#endif

    return 0;
}

通过修改宏定义,决定使用线程池还是线程池组

  1. 测试线程池结果

​2.测试线程池组结果

5.总结

使用线程池情况:一般程序中有并发处理任务,但是处理的任务并发量不高时候采用线程池。

使用线程池组情况:程序中任务并发量很大情况下使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/866637.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Docker安装 Kibana

目录 前言安装Kibana步骤1&#xff1a;准备1. 安装docker2. 搜索可以使用的镜像。3. 也可从docker hub上搜索镜像。4. 选择合适的redis镜像。 步骤2&#xff1a;拉取 kibana 镜像拉取镜像查看已拉取的镜像 步骤3&#xff1a;创建容器创建容器方式1&#xff1a;快速创建容器 步骤…

vue父页面给iframe子页面传值

在vue父页面有两个个参数 名称和图标&#xff0c;需要把这两个参数传到iframe的地图里面&#xff0c;在地图触发绘点事件的时候&#xff0c;获取到传来的参数并且展示 vue:传值给子页面iframe // 传值给子页面iframe(2个参数)handleIframeLoad() {const iframeWindow this.$re…

海康威视iVMS综合安防系统任意文件上传(0Day)

漏洞描述 攻击者通过请求/svm/api/external/report接口任意上传文件,导致获取服务器webshell权限,同时可远程进行恶意代码执行。 免责声明 技术文章仅供参考,任何个人和组织使用网络应当遵守宪法法律,遵守公共秩序,尊重社会公德,不得利用网络从事危害国家安全、荣誉和…

安全第二次

一&#xff0c;iframe <iframe>标签用于在网页里面嵌入其他网页。 1&#xff0c;sandbox属性 如果嵌入的网页是其他网站的页面&#xff0c;因不了解对方会执行什么操作&#xff0c;因此就存在安全风险。为了限制<iframe>的风险&#xff0c;HTML 提供了sandb…

HCIA---动态路由---RIP协议

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 目录 前言 一.动态路由 二.动态路由协议分类 IGP&#xff1a;内部网关协议 EGP:外部网关协议 三.RIP协议概述 RIP版本分类&#xff1a; RIP三要素&#xff1a; 思维…

TypeScript 关于对【泛型】的定义使用解读

目录 概念导读泛型函数多个泛型参数泛型约束泛型别名泛型接口泛型类总结&#xff1a; 概念导读 泛型&#xff08;Generics&#xff09;是指在定义函数、接口或类的时候&#xff0c;不预先指定具体的类型&#xff0c;而在使用的时候再指定类型的一种特性。使用泛型 可以复用类型…

【非科班如何丝滑转码?】探索计算机领域跳槽之路

近年来&#xff0c;计算机领域的蓬勃发展吸引着越来越多非计算机科班出身的人士投身其中。本文将就如何顺利实现非科班转码&#xff0c;计算机岗位的发展前景&#xff0c;以及现阶段转码的建议&#xff0c;结合个人经验和观察&#xff0c;为您阐述详细全面的观点。 一、如何规划…

【雕爷学编程】Arduino动手做(201)---行空板开发环境之VSCode

37款传感器与模块的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&#x…

Kotlin 基础教程一

Kotlin 基本数据类型 Java | Kotlin byte Byte short Short int Int long Long float Float double Double boolean Boolean c…

PHP最简单自定义自己的框架view使用引入smarty(8)--自定义的框架完成

1、实现效果。引入smarty&#xff0c; 实现assign和 display 2、下载smarty&#xff0c;创建缓存目录cache和扩展extend 点击下面查看具体下载使用&#xff0c;下载改名后放到extend PHP之Smarty使用以及框架display和assign原理_PHP隔壁老王邻居的博客-CSDN博客 3、当前控…

xmind用例数据上传至禅道

xmind格式参考&#xff0c;只需要在一级子主题填写对应用例模块ID&#xff0c;其余格式随意即可生成用例并直接上传到禅道&#xff1a; 代码里需填写禅道对应登录账号及用例所属产品 import requests import json import re import hashlib import pprint import threading fr…

并发——Atomic 原子类总结

文章目录 1 Atomic 原子类介绍2 基本类型原子类2.1 基本类型原子类介绍2.2 AtomicInteger 常见方法使用2.3 基本数据类型原子类的优势2.4 AtomicInteger 线程安全原理简单分析 3 数组类型原子类3.1 数组类型原子类介绍3.2 AtomicIntegerArray 常见方法使用 4 引用类型原子类4.1…

段页式储存结构

题目&#xff1a;假设段页式储存结构系统中的地址结构如下图所示 从图中可知段号22-31占了10位&#xff0c;页号12-21占了10位&#xff0c;页内地址0-11占了12位 段&#xff1a; 最多有2^101024个段 页&#xff1a;每段最大允许2^101024个页 页的大小&#xff1a; 2^124x2^1…

【C++精华铺】6.C++类和对象(下)类与对象的知识补充及编译器优化

目录 1. 再谈构造 1.1 成员变量的初始化&#xff08;初始化列表&#xff09; 1.2 初始化列表的行为 1.3 explicit关键字 2. 类中的static成员 2.1 静态成员变量 2.2 静态成员函数 3. 友元 3.1 友元函数 3.1 友元类 4. 内部类 5. 匿名对象 6. 对象拷贝时候的编译器优化…

Linux网络编程 socket编程篇(一) socket编程基础

目录 一、预备知识 1.IP地址 2.端口号 3.网络通信 4.TCP协议简介 5.UDP协议简介 6.网络字节序 二、socket 1.什么是socket(套接字)&#xff1f; 2.为什么要有套接字&#xff1f; 3.套接字的主要类型 拓】网络套接字 三、socket API 1.socket API是什么&#xff1f; 2.为什么…

Qt通过QSS设置QPushButton的样式

同时设置QPushButton的文字样式和图标的方法 为了美化界面&#xff0c;有时候需要修改QPushButton的样式&#xff0c;让一个QPushButton上面既要显示图标&#xff0c;又要显示文字内容 起初我的做法是重写QPushButton&#xff0c;这样做可以实现&#xff0c;但是有几个问题 实现…

【数学建模】逻辑回归算法(Logistic Resgression)

逻辑回归算法 简介逻辑回归与条件概率绘制sigmoid函数 简介 逻辑回归算法是一种简单但功能强大的二元线性分类算法。需要注意的是&#xff0c;尽管"逻辑回归"名字带有“回归”二字&#xff0c;但逻辑回归是一个分类算法&#xff0c;而不是回归算法。 我认为&#xff…

通达OA SQL注入漏洞【CVE-2023-4165】

通达OA SQL注入漏洞【CVE-2023-4165】 一、产品简介二、漏洞概述三、影响范围四、复现环境POC小龙POC检测工具: 五、修复建议 免责声明&#xff1a;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损…

PHP最简单自定义自己的框架实现像TP链式sql语句(六)

1、实现效果&#xff0c;链式sql语句封装 order、where、group、limit等封装 2、数据表构造函数入参&#xff0c;ModelBase.php public $table NULL; public function __construct($table){$this->table$table;if(!$this->table){die("no table" );}$this-&…

C语言----输入scanf和输出printf详解

C语言编程中&#xff0c;输入输出是基本操作&#xff0c;printf和scanf并不是C语言中的唯一的输入输出选择&#xff0c;对于输入有scanf()、getchar()、getche()、getch()、gets()&#xff1b;对于输出有printf()、puts()、putchar()&#xff0c;他们各有自己的使用场景&#x…