Linux环境下实现并详细分析c/cpp线程池(附源码)

news2024/11/14 12:16:04

一、线程池原理

如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

线程池是一种多线程处理形式,处理过程中将任务添加到任务队列,然后在创建线程后自动启动这些任务。

  • 线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。
  • 如果某个线程在托管代码中空闲(如正在等待某个事件), 则线程池将插入另一个辅助线程来使所有处理器保持繁忙
  • 如果所有线程池线程都始终保持繁忙但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。
  • 超过最大值的线程可以排队,但他们要等到其他线程完成后才启动

二、线程池组成

在这里插入图片描述

  1. 任务队列(存储需要处理的任务,由工作的线程来处理这些任务)
  • 通过线程池提供的 API 函数,将一个待处理的任务添加到任务队列或者从任务队列中删除
  • 已处理的任务会被从任务队列中删除
  • 线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程
  1. 工作的线程(任务队列任务的消费者,worker) ,N个
  • 线程池中维护了一定数量的工作线程,他们的作用是是不停的读任务队列,从里边取出任务并处理
  • 工作的线程相当于是任务队列的消费者角色,
  • 如果任务队列为空工作的线程将会被阻塞 (使用条件变量 / 信号量阻塞)
  • 如果阻塞之后有了新的任务,由生产者将阻塞解除,工作线程开始工作
  1. 管理者线程(不处理任务队列中的任务),1个
  • 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
  • 当任务过多的时候,可以适当的创建一些新的工作线程
  • 当任务过少的时候,可以适当的销毁一些工作的线程

三、线程池C实现

3.1 cmake

cmake_minimum_required(VERSION 2.8.12)
project(Thread_Pool CXX C)
message(STATUS "CMake version: " ${CMAKE_VERSION})
message(STATUS "CMake system name: " ${CMAKE_SYSTEM_NAME})
message(STATUS "CMake system processor: " ${CMAKE_SYSTEM_PROCESSOR})
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin)

include_directories(${CMAKE_SOURCE_DIR}/include)
file(GLOB SRC_FILES
    "${PROJECT_SOURCE_DIR}/src/*.c"
    "${PROJECT_SOURCE_DIR}/src/*.cpp"
)

find_package(Threads REQUIRED)
add_executable(${CMAKE_PROJECT_NAME}  ${SRC_FILES})

target_link_libraries(${PROJECT_NAME} ${CMAKE_THREAD_LIBS_INIT})

target_link_libraries(${CMAKE_PROJECT_NAME} Threads::Threads)
target_link_libraries(${CMAKE_PROJECT_NAME} pthread)

3.2 threadPool.h

1. 创建任务结构体(task类型)

  • 任务的函数指针
  • 任务函数的参数

2. 创建线程池结构体(ThreadPoll类型)

  • 任务队列(任务结构体(Task)的指针(Task *taskQ),本质是一个环形队列)
  • 任务队列的的容量,任务队列头(存放的是任务队列数组的下标),任务队列尾(同上)
  • master线程的ID,worker线程的ID(指针,数组的头)
  • 最小线程数,最大线程数、繁忙线程数、存活线程数、销毁线程数
  • 锁:线程池的锁、繁忙线程数锁
  • 条件变量:队列是不是满、队列是不是空
  • 销毁线程池标志

3. 相关API函数的声明

  • 线程池创建初始化
  • 销毁线程池
  • 线程池添加任务
  • 获取工作线程数、存活线程数
  • 单个线程退出
#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__
#include<stdio.h>
#include<stdio.h>
#include<pthread.h>

//任务结构体
struct Task{
    void(*function)(void* arg);  //任务的函数指针
    void* arg;					// 任务函数的参数(void指针类型)
}; 
typedef struct Task Task;
//线程池结构体
struct ThreadPool
{ 
    Task* taskQ;			 /* 任务队列 */
    int queueCapacity;      //容量
    int queueSize;          //当前任务个数
    int queueFront;         //队头 -> 取数据
    int queueRear;          //队尾  -> 放数据
    pthread_t managerID;    //管理者线程ID(只有1个!)
    pthread_t *threadIDs;   //工作的线程ID(有许多个!)
    int minNum;				//最小线程数量
    int maxNum;				//最大线程数量
    int busyNum;			//繁忙的线程数量(当前正在执行工作函数的线程)
    int liveNum;			//存活的线程数量(当前存在的工作线程数,liveNum = busyNum + 没有任务阻塞休眠的线程)
    int exitNum;			//要销毁的线程数,准备要删除的线程数量
    pthread_mutex_t mutexPool; //锁整个线程池 pthread_mutex_t <==> std::mutex(C++11)
    pthread_mutex_t mutexBusy; //锁busyNum变量 pthread_mutex_t <==> std::mutex(C++11)
    pthread_cond_t notFull;    //任务队列是不是满了 pthread_cond_t <==> std::condition_varibale类(C++11)
    pthread_cond_t notEmpty;   //任务队列是不是空了
    int shutdown;              //是不是要销毁线程池,销毁为1,不销毁为0 
 
};
typedef struct ThreadPool ThreadPool;
 
    //创建线程池并且初始化
    ThreadPool* threadPoolCreate(int min,int max,int queueSize);
 
    //销毁线程池
    int threadPoolDestroy(ThreadPool* pool);
 
    //给线程池添加任务
    void threadPoolAdd(ThreadPool* pool,void(*func)(void*),void* arg);
 
    //获取线程池中工作的线程的个数
    int threadPoolBusyNum(ThreadPool* pool);
 
    //获取线程池中活着的线程的个数
    int threadPoolAliveNum(ThreadPool* pool);
 
    //注意:工作的线程一定是活着的!但是,活着的线程不一定在工作!!!
 
    //工作的线程(消费者线程)的任务函数
    void* worker(void* arg);
    //管理者线程任务函数
    void* manager(void* arg);// (主要用于创建和销毁线程池的!)
    //单个线程退出
    void threadExit(ThreadPool* pool);
    
#endif

3.3 threadPool.c

1. threadPoolCreate,创建线程池

  • 输入:工作线程最小数、最大数 , 任务队列大小 ;返回值:线程池指针
  • 1 申请线程池内存,申请max*woker线程的内存,
  • 2 申请pool、busy锁;申请enmpty、full条件变量
  • 3 申请Task * queueSize任务队列内存
  • 4 创建一个master线程、min个woker线程
  • 5 如果申请过程中失败,释放申请内存

2. threadPoolDestroy,销毁线程池

  • 输入:线程池指针;返回值:0
  • 1 销毁线程池标志shutdown置1,
  • 2 回收master线程
  • 3 empty条件变量唤醒worker线程
    • woker线程没有任务的时候,empty条件变量将woker阻塞
    • worker线程因为empty条件变量阻塞有两种唤醒情况:
      • 1、添加新任务,唤醒empty阻塞线程,布置新任务;
      • 2、线程池销毁,唤醒empty阻塞线程,然后判断销毁标志shutdown == 1,woker线程自杀(使用threadExit)
  • 4 释放申请的内存
  • 5 释放锁和条件变量

3. threadPoolAdd,添加任务

  • 输入:线程池指针,任务函数指针,任务函数参数指针;返回值:空
  • 1 如果线程池任务队列数等于最大容量,使用full条件变量阻塞master线程
  • 2 任务队列添加任务(添加任务函数和函数参数)
  • 3 队尾后移(对队列容量取余,实现环形队列),队列大小++
  • 4 empty条件变量唤醒woker线程,之前因为任务队列空而empty条件休眠的woker线程

4. threadPoolBusyNum,获取忙线程个数

  • 输入:线程池指针;返回值:忙线程数

5. threadPoolAliveNum,获取活线程个数

  • 输入:线程池指针;返回值:活线程数

6. worker,工作线程函数

  • 输入:线程池指针;返回值:空
  • 1 判断任务队列是否为空,为空则empty条件变量阻塞线程
  • 2 判断销毁线程数是否大于0,如果大于该worker线程自杀(threadExit函数)
  • 3 判断销毁线程池标志shutdown是否为1,如果是该worker线程自杀(threadExit函数)
  • 4 创建新任务task,从任务队列头中取任务,然后放入新创建的task。
  • 5 移动队列头,queueSize–,busyNum++
  • 6 唤醒full条件变量阻塞的生产者线程,之前add函数中任务队列满了,full条件变量阻塞生产者线程,现在释放
  • 7 执行真正的任务,task.function(task.arg);
  • 8 任务结束busyNum–

7. manager,管理线程函数

  • 输入:线程池指针;返回值:空
  • 1 检测线程池销毁标志shutdown,没有销毁一直检测,每隔一段时间管理一下
  • 2 取出任务队列尺寸,活线程数,忙线程数
  • 3 如果 活线程数 < 最大线程数 && 活线程数 < 队列尺寸,说明忙不过来,添加新woker线程
  • 4 如果 忙线程*2 < 活线程数 && 最小线程数 < 活线程数 ,说明woker过多,设置销毁线程数,然后使用empty条件变量唤醒woker线程,唤醒的worker线程自杀

8. threadExit,线程退出函数

  • 输入:线程池指针;返回值:空
  • 1 获取当前pid
  • 2 在任务队列里对比pid
  • 3 找到自己的pid,改为0,给以后备用
  • 4 线程自杀,pthread_exit(NULL);
#include"../include/thread_pool.h"
#include<string.h>//用memset函数
#include<unistd.h>//用sleep函数
#include<pthread.h>//使用pthread_self() 打印当前线程的id的函数
#include<stdlib.h>//使用malloc操作
const int ADDORDESTROYNUMBER = 2;//每次添加/销毁的线程的number个数

//创建线程池并且初始化
//形参表分别为:最小线程个数,最大线程个数以及队列大小!
    ThreadPool* threadPoolCreate(int min,int max,int queueSize)
    {
        ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
        do{
            if(pool == NULL){//分配内存失败 return 空
                printf("malloc threadpool fail ...\n");
                break;
                // return NULL;
            }
            //succeed to create a thread pool!
            pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);//工作线程中,分别Max个空间的heap区数组空间
            if(pool->threadIDs == NULL){
                printf("malloc threadIDs fail ...\n");//分配内存失败 return 空
                break;
                // return NULL;
            }
            //初始化工作的线程IDs们
            memset(pool->threadIDs, 0, sizeof(pthread_t) * max);//memset为cstring中的函数,用来赋值!
            //把线程id数组中的元素全都赋值为0
            pool->minNum = min;
            pool->maxNum = max;
            pool->busyNum = 0;
            pool->liveNum = min;//初始化时以线程的个数最小值来创建活着的线程的个数!
            pool->exitNum = 0;//一开始初始化肯定没有销毁线程的,这个数量要根据程序运行中的状态来decide!
 
 
            if(pthread_mutex_init(&pool->mutexPool,NULL) != 0||
                pthread_mutex_init(&pool->mutexBusy,NULL) != 0||
                pthread_cond_init(&pool->notEmpty,NULL) != 0||
                pthread_cond_init(&pool->notFull,NULL) != 0)
            {
                //此时创建失败
                printf("mutex or condition inti fail ...\n");
                break;
                // return 0;
            }
 
            //此时创建锁mutex和条件变量cond成功!
 
            //然后创建任务队列
            pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
            //开辟一块arr内存存放任务队列,arr所存储的任务最大值为容量那么大
            pool->queueCapacity = queueSize;
            pool->queueSize = 0;//当前任务数为0个
            pool->queueFront = 0;//因为没有任务,所有头部执行0index
            pool->queueRear = 0;//因为没有任务,所有尾部执行0index
            
            pool->shutdown = 0;//初始化时肯定不能销毁线程池,所有标记为0(自己规定的)
 
            //创建线程
            pthread_create(&pool->managerID,NULL,manager,pool);//创建管理者这一个线程
            for(int i=0;i<min;i++){//一开始就创建min个线程(这是线程池中的最小线程数!)
                pthread_create(&pool->threadIDs[i],NULL,worker,pool);
            }
 
            //如果能成功执行到这里,那么就表示成功执行了线程池!
            //此时直接返回线程池即可!
            return pool;
        }while(0);//只会执行一次!只要有开辟不成功的case,马上break出while循环!
        
        //下面再进行资源释放的工作!
        if(pool && pool->threadIDs) free(pool->threadIDs);//线程池存在的case下,开辟了线程IDs的空间时,就释放它!
        if(pool && pool->taskQ) free(pool->taskQ);//线程池存在的case下,开辟了taskQ的空间时,就释放它!
        if(pool) free(pool);//释放线程池
 
        return NULL;
    }
 
    //销毁线程池(当线程池被销毁时,线程池中的所有成员都必须被销毁!)
    int threadPoolDestroy(ThreadPool* pool){
        if(pool == NULL) return -1;//    表示此时线程池已空,不需要销毁了
        //  线程池不空(没被销毁时)
        pool->shutdown = 1;//关闭线程池!
 
        //  阻塞回收管理者线程
        pthread_join(pool->managerID,NULL);
 
        //  唤醒阻塞的(活着的)消费者线程
        //  唤醒后他们会自动退出,为什么退出呢?(因为我们写了让他们退出的条件判断代码!)
        for(int i = 0;i < pool->liveNum; i++){
            pthread_cond_signal(&pool->notEmpty);
        }
 
        //  释放申请的堆区内存
        if(pool->taskQ){
            free(pool->taskQ);
            pool->taskQ = NULL;
        }
        if(pool->threadIDs){
            free(pool->threadIDs);
            pool->threadIDs = NULL;
        }
 
 
        //再释放互斥量锁还有条件类锁
        pthread_mutex_destroy(&pool->mutexBusy);
        pthread_mutex_destroy(&pool->mutexPool);
        pthread_cond_destroy(&pool->notEmpty);
        pthread_cond_destroy(&pool->notFull);
 
        free(pool);
        pool = NULL;
        return 0;//return 0 就表示的是成功Destory了线程池并返回了!
    }
 
    //给线程池的任务队列中添加任务
    void threadPoolAdd(ThreadPool* pool,void(*func)(void*),void* arg){
        //由于你给该线程池的任务队列中添加任务时,很有可能此时你正在对任务进行读or写的操作
        //那么因此这里就必须要用线程池的锁mutex_pool来锁住(防止这份共享代码因为OS的调度切换搞乱了!)
        pthread_mutex_lock(&pool->mutexPool);
        while(pool->queueSize == pool->queueCapacity && !pool->shutdown){
            //  此时线程池的任务队列数 = 其最大容量了 并且 该线程池还没有被销毁 时
            
            //  阻塞生产者线程
            pthread_cond_wait(&pool->notFull,&pool->mutexPool);
                
        }
        //  此时被堵塞的生产者线程 被唤醒了(注意:此时该线程还是拿到了mutex互斥锁的状态的!)
 
        //  先判断线程池是否已经被销毁了!
        //  线程池被销毁
        if(pool->shutdown){
            pthread_mutex_unlock(&pool->mutexPool);//   解锁
            return;//   并 退出程序
        }
 
        //  线程池没被销毁时 
        //  给线程池中的任务队列 添加任务
        
        pool->taskQ[pool->queueRear].function = func;
        pool->taskQ[pool->queueRear].arg = arg;
        //  让队尾index (循环)后移!
        pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
        pool->queueSize++;//    队列任务+1

        //生产者生产出新的任务,唤醒消费者,消费任务
        pthread_cond_signal(&pool->notEmpty);//pool->notEmpty用来worker中!
        //通知pool->notEmpty这个condition_variable条件变量的对象 返回true 让他唤醒了去工作干活了!
 
        pthread_mutex_unlock(&pool->mutexPool);
    }
 
    //  获取线程池中工作(忙)的线程的个数
    int threadPoolBusyNum(ThreadPool* pool){
        //  注意 读取pool中的成员变量时 也为了防止别的线程给这些变量写or读数据
        //  应该加上锁!
        pthread_mutex_lock(&pool->mutexBusy);
        int busyNum = pool->busyNum;
        pthread_mutex_unlock(&pool->mutexBusy);
        return busyNum;
    }
 
    //  获取线程池中活着的线程的个数
    int threadPoolAliveNum(ThreadPool* pool){
        //  注意 读取pool中的成员变量时 也为了防止别的线程给这些变量写or读数据
        //  应该加上锁!且这里因为没有给aliveNum 定义对应的互斥量 so 直接用线程池的锁即可
        pthread_mutex_lock(&pool->mutexPool);
        int aliveNum = pool->liveNum ;
        pthread_mutex_unlock(&pool->mutexPool);
        return aliveNum ;
    }
 
    //  工作的线程(消费者线程)的任务函数
    void* worker(void* arg){
        //首先把传入进来的参数类型转换为线程池类型
        ThreadPool* pool = (ThreadPool*)arg;
 
        //接下来就算不断地读取任务队列中的任务(每个任务就是一个函数)
 
        while(1){
            pthread_mutex_lock(&pool->mutexPool);//加锁
            //  当前任务队列是否为空
            while(pool->queueSize == 0 && !pool->shutdown)
            {
                //  在当前队列中任务为空 并且 线程池没有被销毁时 就阻塞在这个工作线程中
                pthread_cond_wait(&pool->notEmpty,&pool->mutexPool);//只有当pool->notEmpty的返回值为true时,就继续往下执行!
 
                //  当该哦工作的线程被唤醒时,判断一下该线程是否需要被销毁
                if(pool->exitNum > 0)
                {
                    pool->exitNum--;//每次判断好要让该线程自杀后,必须让exitNum--才行!
                    //  当然,当条件变量wait被唤醒后,肯定是拿到了互斥锁才继续往下面执行的
                    //  此时就必须要把pool->mutexPool这把锁头给unlock解锁一下,再退出该程序,否则你没解锁就退出的话该程序就直接死锁了!
                    if(pool->liveNum > pool->minNum)
                    {
                        pool->liveNum--;
                        pthread_mutex_unlock(&pool->mutexPool);//解锁
                        // pthread_exit(NULL);//退出该线程!
                        threadExit(pool);
                    }
                    
                }
 
            }
 
            //  判断线程池是否被关闭了
            if(pool->shutdown){//   若线程池被销毁了,可以直接退出线程!
                pthread_mutex_unlock(&pool->mutexPool);//解锁
                //  pthread_exit(NULL);//退出线程
                threadExit(pool);
            }
 
            //  若线程池没有被销毁了,可以直接做任务了(消费)
            //  从任务队列中取出一个任务(真正 do things!)
            struct Task task;
            task.function = pool->taskQ[pool->queueFront].function;
            task.arg = pool->taskQ[pool->queueFront].arg;
            //  移动头节点 使之do循环移动 构成一个循环队列 , 取余数的目的的为了形成环形队列,到最后一个进行取余变成0,其他不变
            pool->queueFront = (pool->queueFront+1) % pool->queueCapacity;
            pool->queueSize--;//取出1个任务了,所有--
 
            //解锁,消费者消费成功,唤醒阻塞的生产者生产。
            pthread_cond_signal(&pool->notFull);
            pthread_mutex_unlock(&pool->mutexPool);
 
            //  do 真正的工作(在function中)
            printf("thread %ld start working...\n",pthread_self());
            pthread_mutex_lock(&pool->mutexBusy);
            pool->busyNum++;//工作时++
            pthread_mutex_unlock(&pool->mutexBusy);
            
            //  do things 真正工作的函数进行执行
            task.function(task.arg);// <==> (*task.function)(task.arg);
            //  同时,因为此时这个线程在忙,所有busyNum++且用对应的锁来锁住,防止这个线程的小任务还没真正开始干活呢,
            //  OS就因为调度切换到别的线程去了
 
            free(task.arg);
            task.arg = NULL;
            printf("thread %ld end working...\n",pthread_self());
           
            pthread_mutex_lock(&pool->mutexBusy);
            pool->busyNum--;//工作完成后--
            pthread_mutex_unlock(&pool->mutexBusy);
 
            pthread_mutex_unlock(&pool->mutexPool);//解锁
        }
        return NULL;
    }
    //管理者线程任务函数(主要用于创建和销毁线程池的!)
    void* manager(void* arg){
        ThreadPool* pool = (ThreadPool*)arg;
        while(!pool->shutdown){
            //  只要线程池没有销毁 就继续管理它
            //  每隔3s钟 管理一下
            sleep(3);//linux 下要包含头文件unistd.h才能使用!
            
            //  取出线程池钟的任务数量以及当前线程的数量
            //  (由于你读取的过程钟有可能别的线程正在写数据),为了防止这种case你就必须要上锁才行!
            pthread_mutex_lock(&pool->mutexPool);//加锁
            int queueSize = pool->queueSize;
            int liveNum = pool->liveNum;
            pthread_mutex_unlock(&pool->mutexPool);//解锁
 
 
            //  取出忙的线程的数量
            pthread_mutex_lock(&pool->mutexBusy);//加锁
            int busyNum = pool->busyNum;
            pthread_mutex_unlock(&pool->mutexBusy);//解锁
 
            //  添加线程(此时可以根据你项目的需要来制定相应的添加线程个数的规则)
            
            //  比如:当存活的线程数 < max && 当前存活的线程数 < 当前任务队列钟的任务数量 时 就添加线程!(此时表明当前的线程数已经忙不过来了)
            
            if(liveNum < pool->maxNum && liveNum < queueSize){
                pthread_mutex_lock(&pool->mutexPool);//加锁
                int cnt = 0; 
                for(int i=0;i< pool->maxNum && cnt < ADDORDESTROYNUMBER 
                            && pool->liveNum < pool->maxNum;i++)
                {
                    if(pool->threadIDs[i] == 0)//为0就 表明此时该线程ID可用!(之前没被使用,线程我可以用了!)
                    {
                        //  创建线程元素并放进去数组中!
                        pthread_create(&pool->threadIDs[i],NULL,worker,pool);
                        cnt++;
                        pool->liveNum++;
                    }
                }
                pthread_mutex_unlock(&pool->mutexPool);//解锁
            }
            
            //  销毁线程(此时可以根据你项目的需要来制定相应的销毁线程个数的规则)
            //  比如:当 忙的线程数*2 < 存活的线程数 && 最小线程数 < 存活的线程数 时 就销毁线程
            if(busyNum * 2 < liveNum && pool->minNum < liveNum){
                pthread_mutex_lock(&pool->mutexPool);
                pool->exitNum = ADDORDESTROYNUMBER;//要销毁的线程数字定义为ADDORDESTROYNUMBER(2)
                pthread_mutex_unlock(&pool->mutexPool);
 
                //  让工作的线程自杀
                for(int i=0;i<ADDORDESTROYNUMBER;i++){
                    //唤醒线程,唤醒的是存活且不忙的线程
                    pthread_cond_signal(&pool->notEmpty);// ==> condition_variable中的.notify_one()
                }
            }
        }
        return NULL;
    }
    //单个线程退出
    void threadExit(ThreadPool* pool){
        //先获取当前线程的线程id
        pthread_t threadId = pthread_self();// <==> std::this_thread::get_id();
        for(int i=0;i<pool->maxNum;i++)
        {
            if(pool->threadIDs[i] == threadId)
            {
                pool->threadIDs[i] = 0;//重新置为0,表示该线程ID可在后续被使用了!
                printf("threadExit() called,%ld existing...\n",threadId);
                break;
            }
        }
        pthread_exit(NULL);
    }

四、线程池CPP实现

4.1 任务队列类的声明

// 定义任务结构体
using callback = void(*)(void*);
struct Task
{
    Task()
    {
        function = nullptr;
        arg = nullptr;
    }
    Task(callback f, void* arg)
    {
        function = f;
        this->arg = arg;
    }
    callback function;
    void* arg;
};

// 任务队列
class TaskQueue
{
public:
    TaskQueue();
    ~TaskQueue();

    // 添加任务
    void addTask(Task& task);
    void addTask(callback func, void* arg);

    // 取出一个任务
    Task takeTask();

    // 获取当前队列中任务个数
    inline int taskNumber()
    {
        return m_queue.size();
    }

private:
    pthread_mutex_t m_mutex;    // 互斥锁
    std::queue<Task> m_queue;   // 任务队列
};

4.2 任务队列类的定义

TaskQueue::TaskQueue()
{
    pthread_mutex_init(&m_mutex, NULL);
}

TaskQueue::~TaskQueue()
{
    pthread_mutex_destroy(&m_mutex);
}

void TaskQueue::addTask(Task& task)
{
    pthread_mutex_lock(&m_mutex);
    m_queue.push(task);
    pthread_mutex_unlock(&m_mutex);
}

void TaskQueue::addTask(callback func, void* arg)
{
    pthread_mutex_lock(&m_mutex);
    Task task;
    task.function = func;
    task.arg = arg;
    m_queue.push(task);
    pthread_mutex_unlock(&m_mutex);
}

Task TaskQueue::takeTask()
{
    Task t;
    pthread_mutex_lock(&m_mutex);
    if (m_queue.size() > 0)
    {
        t = m_queue.front();
        m_queue.pop();
    }
    pthread_mutex_unlock(&m_mutex);
    return t;
}

4.3 线程池类的声明

class ThreadPool
{
public:
    ThreadPool(int min, int max);
    ~ThreadPool();

    // 添加任务
    void addTask(Task task);
    // 获取忙线程的个数
    int getBusyNumber();
    // 获取活着的线程个数
    int getAliveNumber();

private:
    // 工作的线程的任务函数
    static void* worker(void* arg);
    // 管理者线程的任务函数
    static void* manager(void* arg);
    void threadExit();

private:
    pthread_mutex_t m_lock;
    pthread_cond_t m_notEmpty;
    pthread_t* m_threadIDs;
    pthread_t m_managerID;
    TaskQueue* m_taskQ;
    int m_minNum;
    int m_maxNum;
    int m_busyNum;
    int m_aliveNum;
    int m_exitNum;
    bool m_shutdown = false;
};

4.4 线程池类的定义

ThreadPool::ThreadPool(int minNum, int maxNum)
{
    // 实例化任务队列
    m_taskQ = new TaskQueue;
    do {
        // 初始化线程池
        m_minNum = minNum;
        m_maxNum = maxNum;
        m_busyNum = 0;
        m_aliveNum = minNum;

        // 根据线程的最大上限给线程数组分配内存
        m_threadIDs = new pthread_t[maxNum];
        if (m_threadIDs == nullptr)
        {
            cout << "malloc thread_t[] 失败...." << endl;;
            break;
        }
        // 初始化
        memset(m_threadIDs, 0, sizeof(pthread_t) * maxNum);
        // 初始化互斥锁,条件变量
        if (pthread_mutex_init(&m_lock, NULL) != 0 ||
            pthread_cond_init(&m_notEmpty, NULL) != 0)
        {
            cout << "init mutex or condition fail..." << endl;
            break;
        }

        /// 创建线程 //
        // 根据最小线程个数, 创建线程
        for (int i = 0; i < minNum; ++i)
        {
            pthread_create(&m_threadIDs[i], NULL, worker, this);
            cout << "创建子线程, ID: " << to_string(m_threadIDs[i]) << endl;
        }
        // 创建管理者线程, 1个
        pthread_create(&m_managerID, NULL, manager, this);
    } while (0);
}

ThreadPool::~ThreadPool()
{
    m_shutdown = 1;
    // 销毁管理者线程
    pthread_join(m_managerID, NULL);
    // 唤醒所有消费者线程
    for (int i = 0; i < m_aliveNum; ++i)
    {
        pthread_cond_signal(&m_notEmpty);
    }

    if (m_taskQ) delete m_taskQ;
    if (m_threadIDs) delete[]m_threadIDs;
    pthread_mutex_destroy(&m_lock);
    pthread_cond_destroy(&m_notEmpty);
}

void ThreadPool::addTask(Task task)
{
    if (m_shutdown)
    {
        return;
    }
    // 添加任务,不需要加锁,任务队列中有锁
    m_taskQ->addTask(task);
    // 唤醒工作的线程
    pthread_cond_signal(&m_notEmpty);
}

int ThreadPool::getAliveNumber()
{
    int threadNum = 0;
    pthread_mutex_lock(&m_lock);
    threadNum = m_aliveNum;
    pthread_mutex_unlock(&m_lock);
    return threadNum;
}

int ThreadPool::getBusyNumber()
{
    int busyNum = 0;
    pthread_mutex_lock(&m_lock);
    busyNum = m_busyNum;
    pthread_mutex_unlock(&m_lock);
    return busyNum;
}


// 工作线程任务函数
void* ThreadPool::worker(void* arg)
{
    ThreadPool* pool = static_cast<ThreadPool*>(arg);
    // 一直不停的工作
    while (true)
    {
        // 访问任务队列(共享资源)加锁
        pthread_mutex_lock(&pool->m_lock);
        // 判断任务队列是否为空, 如果为空工作线程阻塞
        while (pool->m_taskQ->taskNumber() == 0 && !pool->m_shutdown)
        {
            cout << "thread " << to_string(pthread_self()) << " waiting..." << endl;
            // 阻塞线程
            pthread_cond_wait(&pool->m_notEmpty, &pool->m_lock);

            // 解除阻塞之后, 判断是否要销毁线程
            if (pool->m_exitNum > 0)
            {
                pool->m_exitNum--;
                if (pool->m_aliveNum > pool->m_minNum)
                {
                    pool->m_aliveNum--;
                    pthread_mutex_unlock(&pool->m_lock);
                    pool->threadExit();
                }
            }
        }
        // 判断线程池是否被关闭了
        if (pool->m_shutdown)
        {
            pthread_mutex_unlock(&pool->m_lock);
            pool->threadExit();
        }

        // 从任务队列中取出一个任务
        Task task = pool->m_taskQ->takeTask();
        // 工作的线程+1
        pool->m_busyNum++;
        // 线程池解锁
        pthread_mutex_unlock(&pool->m_lock);
        // 执行任务
        cout << "thread " << to_string(pthread_self()) << " start working..." << endl;
        task.function(task.arg);
        delete task.arg;
        task.arg = nullptr;

        // 任务处理结束
        cout << "thread " << to_string(pthread_self()) << " end working...";
        pthread_mutex_lock(&pool->m_lock);
        pool->m_busyNum--;
        pthread_mutex_unlock(&pool->m_lock);
    }

    return nullptr;
}


// 管理者线程任务函数
void* ThreadPool::manager(void* arg)
{
    ThreadPool* pool = static_cast<ThreadPool*>(arg);
    // 如果线程池没有关闭, 就一直检测
    while (!pool->m_shutdown)
    {
        // 每隔5s检测一次
        sleep(5);
        // 取出线程池中的任务数和线程数量
        //  取出工作的线程池数量
        pthread_mutex_lock(&pool->m_lock);
        int queueSize = pool->m_taskQ->taskNumber();
        int liveNum = pool->m_aliveNum;
        int busyNum = pool->m_busyNum;
        pthread_mutex_unlock(&pool->m_lock);

        // 创建线程
        const int NUMBER = 2;
        // 当前任务个数>存活的线程数 && 存活的线程数<最大线程个数
        if (queueSize > liveNum && liveNum < pool->m_maxNum)
        {
            // 线程池加锁
            pthread_mutex_lock(&pool->m_lock);
            int num = 0;
            for (int i = 0; i < pool->m_maxNum && num < NUMBER
                && pool->m_aliveNum < pool->m_maxNum; ++i)
            {
                if (pool->m_threadIDs[i] == 0)
                {
                    pthread_create(&pool->m_threadIDs[i], NULL, worker, pool);
                    num++;
                    pool->m_aliveNum++;
                }
            }
            pthread_mutex_unlock(&pool->m_lock);
        }

        // 销毁多余的线程
        // 忙线程*2 < 存活的线程数目 && 存活的线程数 > 最小线程数量
        if (busyNum * 2 < liveNum && liveNum > pool->m_minNum)
        {
            pthread_mutex_lock(&pool->m_lock);
            pool->m_exitNum = NUMBER;
            pthread_mutex_unlock(&pool->m_lock);
            for (int i = 0; i < NUMBER; ++i)
            {
                pthread_cond_signal(&pool->m_notEmpty);
            }
        }
    }
    return nullptr;
}

// 线程退出
void ThreadPool::threadExit()
{
    pthread_t tid = pthread_self();
    for (int i = 0; i < m_maxNum; ++i)
    {
        if (m_threadIDs[i] == tid)
        {
            cout << "threadExit() function: thread " 
                << to_string(pthread_self()) << " exiting..." << endl;
            m_threadIDs[i] = 0;
            break;
        }
    }
    pthread_exit(NULL);
}

五、线程池实现难点

4.1 empty条件变量

判断任务队列是否满了

  • 阻塞休眠
    • 当worker函数消费者,消耗任务队列为空,empty条件变量让该woker线程阻塞休眠
  • 释放唤醒
    1. 当add函数添加新任务到队列中,empty条件变量唤醒阻塞woker线程,执行新任务
    2. 当threadPoolDestroy函数销毁线程池,shutdown标志为1,empty条件变量唤醒阻塞woker线程。woker线程醒来后发现shutdown标志为1,进行自杀
    3. 当manager函数判断当前woker线程过多,设置销毁线程数exitNum,empty条件变量唤醒阻塞woker线程。woker线程醒来后发现s销毁线程数exitNum 大于0,进行自杀

4.2 full条件变量

判断任务队列是否空了

  • 阻塞休眠
    • 当add函数发现任务队列满了,无法添加新任务时,full条件变量让该生产者线程(调用add函数的线程)阻塞休眠
  • 释放唤醒
    • 当worker函数从任务队列中取出新任务,任务队列有空间的时候,full条件变量唤醒阻塞生产者线程,创建新任务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/391904.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity Animator.Play(stateName, layer, normalizedTime) 播放动画函数用法

原理 接口&#xff1a; public void Play(string stateName, int layer -1, float normalizedTime float.NegativeInfinity);参数含义stateName动画状态机的某个状态名字layer第几层的动画状态机&#xff0c;-1 表示播放第一个状态或者第一个哈希到的状态normalizedTime从s…

spring security 实现自定义认证和登录(4):使用token进行验证

前面我们实现了给客户端下发token&#xff0c;虽然客户端拿到了token&#xff0c;但我们还没处理客户端下一次携带token请求时如何验证&#xff0c;我们想要实现拿得到token之后&#xff0c;只需要验证token&#xff0c;不需要用户再携带用户名和密码了。 1. 禁用 UsernamePass…

崭新的centos虚拟机不能上网

原因 先说点简单的&#xff1a; 没启用虚拟机容器的网络选项虚拟机的网卡没启用手动设置了网关、掩码、dns等没设置对DHCP没开 做法 没启用虚拟机容器的网络选项 在virtualbox里面&#xff0c;开启虚拟机后右下角有个网络选项这里亮着就说明开了&#xff0c;没亮就右键打开…

BufferQueue研究

我们在工作的过程中&#xff0c;肯定听过分析卡顿或者冻屏问题的时候&#xff0c;定位到APP卡在dequeueBuffer方法里面&#xff0c;或者也听身边的同事老说3Buffer等信息。所以3Buffer是什么鬼&#xff1f;什么是BufferQueue?搞Android&#xff0c;你一定知道Graphic Buffer和…

理解js的精度问题

参考博客&#xff1a;js精度丢失问题-看这篇文章就够了(通俗易懂)、探寻 JavaScript 精度问题以及解决方案、JavaScript 浮点数陷阱及解法 1 为什么 JavaScript 中所有数字包括整数和小数都只有一种类型 即 Number类型&#xff0c;它的实现遵循 IEEE 754 标准。 符号位S&#…

MySQL运维篇之Mycat分片规则

3.5.3、Mycat分片规则 3.5.3.1、范围分片 根据指定的字段及其配置的范围与数据节点的对应情况&#xff0c;来决定该数据属于哪一个分片。 示例&#xff1a; 可以通过修改autopartition-long.txt自定义分片范围。 注意&#xff1a; 范围分片针对于数字类型的字段&#xff0c;…

Kubernetes Pod 水平自动伸缩(HPA)

Pod 自动扩缩容 之前提到过通过手工执行kubectl scale命令和在Dashboard上操作可以实现Pod的扩缩容&#xff0c;但是这样毕竟需要每次去手工操作一次&#xff0c;而且指不定什么时候业务请求量就很大了&#xff0c;所以如果不能做到自动化的去扩缩容的话&#xff0c;这也是一个…

IO文件操作

认识文件 狭义的文件 存储在硬盘上的数据,以“文件"为单位,进行组织 常见的就是普通的文件 (文本文件,图片, office系列,视频,音频可执行程序…)文件夹也叫做"目录" 也是一种特殊的文件。 广义的文件 操作系统,是要负责管理软硬件资源&#xff0c;操作系统(…

更高效的跨端开发选择:基于小程序容器的Flutter应用开发

为什么说Flutter是一个强大的跨端框架&#xff1f; Flutter是一个基于Dart编程语言的移动应用程序开发框架&#xff0c;由Google开发。它的强大之处在于它可以快速构建高性能、美观、灵活的跨平台应用程序&#xff0c;适用于Android、iOS、Web、Windows、macOS和Linux等多个平…

Git图解-常用命令操作

目录 一、前言 二、初始化仓库 三、添加文件 四、Git 流程全景图 五、Git工作流程 六、工作区和暂存区 七、查看文件状态 八、查看提交日志 九、查看差异 十、版本回退 十一、管理修改 十二、修改撤销 十三、删除文件 十四、分支管理 十五、项目分支操作 十六、…

Centos7使用OVS桥的方式创建KVM虚拟机

一、OVS使用 1、OVS编译安装 下载ovs2.17版本源码 http://www.openvswitch.org//download/ ./boot.sh ./configure make && make install2、启动OVS服务 &#xff08;1&#xff09;创建文件/etc/systemd/system/openvswitch.service [rootlocalhost qemu]# syste…

Spring Cloud Alibaba全家桶(五)——微服务组件Nacos配置中心

前言 本文小新为大家带来 微服务组件Nacos配置中心 相关知识&#xff0c;具体内容包括Nacos Config快速开始指引&#xff0c;搭建nacos-config服务&#xff0c;Config相关配置&#xff0c;配置的优先级&#xff0c;RefreshScope注解等进行详尽介绍~ 不积跬步&#xff0c;无以至…

【面试题】如何避免使用过多的 if else?

大厂面试题分享 面试题库前后端面试题库 &#xff08;面试必备&#xff09; 推荐&#xff1a;★★★★★地址&#xff1a;前端面试题库一、引言相信大家听说过回调地狱——回调函数层层嵌套&#xff0c;极大降低代码可读性。其实&#xff0c;if-else层层嵌套&#xff0c;如下图…

.NET 8 预览版 1 发布!

.NET 8 是一个长期支持(LTS) 版本。这篇文章涵盖了推动增强功能优先级排序和选择开发的主要主题和目标。.NET 8 预览版和发布候选版本将每月交付一次。像往常一样&#xff0c;最终版本将在 11 月的某个时候在 .NET Conf 上发布。 .NET 版本包括产品、库、运行时和工具&#xf…

JavaSE学习笔记总结day19

今日内容 二、线程安全的集合 三、死锁 四、线程通信 五、生产者消费者 六、线程池 零、 复习昨日 创建线程的几种方式 1) 继承 2) 实现Runnable 3) callable接口 Future接口 4) 线程池 启动线程的方法 start() 线程的几种状态 什么是线程不安全 setName getName Thread.curr…

基于intel soc+fpga智能驾驶舱和高级驾驶辅助系统软件设计(三)

虚拟化操作系统介绍 车载平台有逐渐融合的趋势&#xff0c;车载 SoC 的计算性能和应用快速增长&#xff0c;面临着多种应用在 多个显示子系统融合在一起的问题&#xff0c;这就要求平台运行多个操作系统。虚拟化&#xff08;Virtualization&#xff09; 技术飞速发展&#xff0…

软件测试培训三个月,找到工作了11K,面试总结分享给大家

功能方面&#xff1a;问的最多的就是测试流程&#xff0c;测试计划包含哪些内容&#xff0c;公司人员配置&#xff0c;有bug开发认为不是 bug怎么处理&#xff0c;怎样才算是好的用例&#xff0c;测试用例设计方法&#xff08;等价类&#xff0c;边界值等概念方法&#xff09;&…

ETL的模式以及优缺点

首先&#xff0c;ETL有四种主要实现模式&#xff1a;触发器模式、增量字段、全量同步、日志比对。其次&#xff0c;四种模式的优缺点触发器模式优点&#xff1a;数据抽取的性能高&#xff0c;ETL 加载规则简单&#xff0c;速度快&#xff0c;不需要修改业务系统表结构&#xff…

科目二练习与考试点位总结

一&#xff0c;开车前检查1.调整桌椅。2.调整左右后视镜。3.系安全带。二、倒车入库右边倒车直行至左肩与左虚线重合停车&#xff0c;倒车&#xff0c;左视镜下沿与左虚线重合或10cm左右&#xff0c;方向盘右打满。看右视镜第二个虚线一半回一圈。看右视镜右库角消失右打满。观…

Qt音视频开发20-vlc内核动态保存录像文件(不需要重新编译源码)

一、前言 在vlc默认提供的保存文件方式中&#xff0c;通过打开的时候传入指定的参数来保存文件&#xff0c;直到关闭播放生成文件&#xff0c;这种方式简单暴力&#xff0c;但是不适用大部分的场景&#xff0c;大部分时候需要的是提供开始录制和停止录制的功能&#xff0c;也就…