线程池的实现和讲解:解决多线程并发服务器创建销毁线程消耗过大的问题

news2024/11/25 9:39:00

1.前言

多进程/线程并发服务器、多路I/O转接服务器的简单实现-CSDN博客

原先的多线程并发服务器,有多少个客户端连接服务器就有多少个线程,CPU需要在多个线程之间来回切换处理客户端的请求,系统消耗比较大(每次创建和消耗线程在操作系统内部消耗大)

而select、epoll的多路IO转接服务器,只需要一个进程就可以了,交给了内核维护,系统开销比较小

多进程也是,只不过多个进程的使得可使用的文件描述符变多,能同时处理的客户端数量变得更多,当然资源消耗也大

为了解决多线程每次线程创建好处理完客户端的数据后就销毁线程导致消耗过大,就可以采用线程池的方法,一次性创建一堆线程:

1. 预先创建阻塞于accept多线程,使用互斥锁上锁保护accept(server和clien通信之前就创建好的)

2. 预先创建多线程,由主线程调用accept

client1连接完后,pthread_cond_wati让线程1释放锁阻塞等 待任务队列中任务的到来,client1有数据传给server了,也就是到达server的任务拿到锁了将任务写进了任务队列当中,server用pthread_cond_signal唤醒正在阻塞等待的线程1去处理client的数据,处理完后重新上锁,将线程1还回给线程池而不是去销毁掉,避免掉了开销

线程池解决server创建线程处理数据时服务器的开销

select等则是解决多个client如何去和server建立连接的请求(accept的阻塞性质)

对于线程池应该有动态的机制,当客户量访问太多时,也就是正在忙活的线程数量pthread_busy_num过多,快接近目前线程池存储的线程总数量pthread_live_num时,需要对其进行扩容;当客户量变少时,对线程的数量pthread_live_num进行减少; -- 因此需要专门有一个管理线程来管理线程池

2.案例讲解代码

#define _CRT_SECURE_NO_WARNINGS 1



#include <stdlib.h>
#include<pthread.h>
#include<unistd.h>
#include<assert.h>
#include<stdio.h>
#include<string.h>
#include<signal.h>
#include<errno.h>
#include"threadpool.h"

#define DEFAULT_TIME 10  /*10s检测一次*/
#define MIN WAIT_TASK_NUM 10  /*如果queve size > MIN WAIT TASK NUM 添加新的线程到线程池*/
#define DEFAULT_THREAD_VARY 10 /*每次创建和销毁线程的个数*/
#define true 1
#define false 0

/* A */
typedef struct {
    void* (*function)(void*);  /*函数指针,回调函数 */
    void* arg;   /*上面函数的参数 */
}threadpool_task_t;  /*各子线程任务结构体 */

/*描述符线程池相关信息*/

struct threadpool_t {
    pthread_mutex_t lock;            //用于锁住本结构体
    pthread_mutex_t thread_counter;  //记录忙状态线程个数de琐-- busy_thr_num 

    pthread_cond_t queue_not_full;   //当任务队列满时,添加任务的线程阻塞,等待此条件变量
    pthread_cond_t queue_not_empty;  //任务队列里不为空时,通知等待任务的线程

    pthread_t* threads;              //存放线程池中每个线程的tid。数组
    pthread_t adjust_tid;            //存管理线程tid
    threadpool_task_t* task_queue;   //任务队列(数组首地址)

    int min_thr_num;                 //线程池最小线程数 
    int max_thr_num;                 //线程池最大线程数
    int live_thr_num;                //当前存活线程个数 
    int busy_thr_num;                //忙状态线程个数
    int wait_exit_thr_num;           //要销毁的线程个数

    int queue_front;                 //task_queue队头下标 
    int queue_rear;                  //task queue队尾下标
    int queue_size;                  //task_queue队中实际任务数
    int queue_max_size;              //task queue队列可容纳任务数上限 

    int shutdown;                    //标志位,线程池使用状态,true或false
};

void* adjust_thread(void* threadpool);
int is_thread_alive(pthread t tid);
int threadpool_free(threadpool t* pool);

/* C */
//threadpool create(3,100,100);  线程池的创建
threadpool_t* threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{
    int i;
    threadpool _t* pool = NULL;  /*线程池 结构体 */

    do {
        if ((pool = (threadpool_t*)malloc(sizeof(threadpool_t))) == NULL) {
            printf("malloc threadpool fail");
            break; /*跳出do while*/
        }
        pool->min_thr_num = min_thr_num;  //最小线程数
        pool->max_thr_num = max_thr_num;   //最大线程数
        pool->busy_thr_num = 0;    //忙活的线程
        pool->live_thr_num = min_thr_num;        /*活着的线程数 初值=最小线程数 */
        pool->wait_exit_thr_num = 0;   //要销毁的线程数
        pool->queue_size = 0;                     /*有0个产品 */
        pool->queue_max_size = queue_max_size;    /*最大任务队列数 */
        pool->queue_front = 0;   //任务队头
        pool->queue_rear = 0;  //任务队尾
        pool->shutdown = false;  /*不关闭线程池 */

        /*根据最大线程上限数给工作线程数组开辟空间,并清零*/
        pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * max_thr_num);
        if (pool->threads == NULL) {
            printf("malloc threads fail");
            break;
        }
        memset(pool->threads, 0, sizeof(pthread t) * max_thr_num);

        /*给 任务队列 开辟空间 */
        pool->task_queue = (threadpool_task_t*)malloc(sizeof(threadpool_task_t) * queue_max_size);
        if (pool->task_queue == NULL) {
            printf("malloc task_queue fail");
            break;
        }

        /*初始化互斥琐、条件变量:两个锁和两个条件变量 */
        /*
            pthread_mutex_t lock;            //用于锁住本结构体
            pthread_mutex_t thread_counter;  //记录忙状态线程个数de琐-- busy_thr_num

            pthread_cond_t queue_not_full;   //当任务队列满时,添加任务的线程阻塞,等待此条件变量
            pthread_cond_t queue_not_empty;  //任务队列里不为空时,通知等待任务的线程
        */
        if (pthread_mutex_init(&(pool->lock), NULL) != 0
            || pthread_mutex_init(&(pool->thread_counter), NULL) != 0
            || pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
            || pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
        {
            printf("init the lock or cond fail");
            break;
        }

        /*启动 min_thr_num个work thread */
        //在线程池pool->threads中开启最小个数线程
        for (i = 0; i < min_thr_num; i++) {
            //int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
            pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void*)pool); /*pool指向当前线程池*/
            //每次创建线程就将该线程池pool传给子线程(回调函数的参数),使子线程对线程池pool的数据进行更新
            //比如创建子线程后将pid存储到pool->thread[i]中 -- 回调函数的工作
            printf("start thread 0x%x...\n", (unsianed int)pool->threadsr[i]);
        }
        pthread_create(&(pool->adjust_tid), NULLL, adjust_thread, (void*)pool);  //创建管理者线程

        return pool;
    } while (0);

    threadpool_free(pool);

    return NULL;

}


/* D */
//线程池中各个工作线程 -- 线程创建的回调函数
void* threadpool thread(void* threadpool) {
    threadpool_t* pool = (threadpool t*)threadpool;
    threadpool_task_t task; //各个子线程的任务结构体:函数指针--回调函数;回调函数的参数arg

    while (true) {
        /*Lock must be taken to wait on conditional variable */
        /*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/
        pthread_mutex_lock(&(pool->lock));

        /*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上,若有任务,跳过该while*/
        while ((pool->queue_size == 0) && (!pool->shutdown)) {
            printf("thread @x%x is waiting\n", (unsigned int)pthread_self());
            pthread _cond_wait(&(pool->queue_not_empty), &(pool->lock));

            // --- 看分析G 下面是销毁线程池时会调用到 146-166  --- //
               /*清除指定数目的空闲线程,如果要结束的线程个数大于日,结束线程 -- 用于搜身的*/
            if (pool->wait_exit_thr_num > 0) {
                pool->wait_exit_thr_num--;

                /*如果线程池里线程个数大于最小值时可以结束当前线程*/
                if (pool->live_thr_num > pool->min_thr_num) {
                    printf("thread 0x%x is exiting\n", (unsigned int)pthread self());
                    pool->live_thr_num--;
                    pthread_mutex_unlock(&(pool->lock));
                    pthread_exit(NULL);
                }
            }
        }
        /*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/
        if (pool->srutdown) {
            pthread_mutex_unlock(&(pool->lock));
            printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
            pthread_detach(pthread_self());
            pthread_exit(NULL);  /*线程自行结束 */
        }

        /*从任务队列里获取任务,是一个出队操作*/
        task.function = pool->task queue[pool->queue front].function;
        task.arg = pool->task_queue[pool->queue_front].arg;
        pool->queue_front = (pool->queue_front + 1) % pool->queue _max_size;/*出队,模拟环形队列 */
        pool->queue size--;

        /*通知可以有新的任务添加进来*/
        pthread_cond_broadcast(&(pool->queue_not_full));

        /*任务取出后,立即将 线程池琐 释放*/
        pthread_mutex_unlock(&(pool->lock));

        /*执行任务*/
        printf("thread 0x%x start working\n",(unsigned int)pthread_self());
        pthread_mutex_lock(&(p0ol->thread_counter);  /*忙状态线程数变量琐*/
        pool->busy_thr_num++;   /*忙状态线程数+1*/
        pthread_mutex_unlock(&(pool->thread_counter));

        (*(task.function))(task.arg);  /*执行回调函数任务*/
        //task.function(task.arg);  /*执行回调函数任务*/

        /*任务结束处理*/
        printf("thread 0x%x end working\n", (unsigned int)pthread_self());
        pthread_mutex_lock(&(pool->thread counter));
        pool->busy thr num--;   /*处理掉一个任务,忙状态数线程数-1*/
        pthread_mutex_unlock(&(pool->thread_counter));
    }
    pthread_exit(NULL);
}

/* E 管理线程 */
void* adjust_thread(void* threadpool) {
    int i;
    threadpool_t* pool = (threadpool_t*)threadpool;
    while (!pool->shutdown) {
        sleep(DEFAULT_TIME);      //定时对线程池管理

        pthread_mutex_lock(8(pool->lock));
        int queue_size = pool->queue_size;   //关注 任务数
        int live_thr_nun = pool->live_thr_num;  //存活 线程数
        pthread_mutex_unlock(&(pool->lock));

        pthread_mutex_lock(&(pool->thread counter));
        int busy_thr_num = pool->busy_thr_num;  //忙着的线程数
        pthread_mutex_unlock(&(pool->thread_counter));

        /*创建新线程 算法 : 任务数大于最小线程池个数,且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/
        if (queue size >= MIN WAIT TASK NUM && live thr num < pool->max thr _num) {
            pthread mutex lock(&(pool->lock));
            int add = 0;
            /*一次增加 DEFAULT THREAD 个线程*/
            for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
                && pool->live_thr_num < pool->max_thr_num; i++) {
                if (pool->threads[i] = 0 || is_thread_alive(pool->threads[i])) {
                    pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void*)pool);
                    add++;
                    pool->live_thr_num++;
                }
                pthread_mutex_unlock(&(pool->lock));
            }

            /*销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/
            if ((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num) {
                /*一次销毁DEFAULT_THREAD个线程,随機10個即可 */
                pthread_mutex_lock(&(pool->lock));
                pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;  /*要销毁的线程数 设置为10 */
                pthread mutex_unlock(&(pool->lock));

                for (i = 0; i < DEFAULT THREAD VARY; i++) {
                    /*通知处在空闲状态的线程,他们会自行终止*/
                    pthread cond signal(&(pool->queue_not_enpty));
                }
            }
        }
        return NULL
    }

    /* F */
    //线程池中的线程,模拟处理业务
    void* process(void* arg) {
        printf("thread 0x%x working on taks %d/n", (usigned int)pthread_self(), (int)arg);
        sleep(1);
        printf("task %d is end \n", (int)arg);

        return NULL;
    }

    /*向线程池中 添加一个任务 */
    //threadpool_add(thp,process,(void*)&num[i]); /*向线程池中添加任务 process:小写---->大写*/
    int threadpool_add(threadpool_t * pool, void* (*function)(void* arg), void* arg) {
        pthread mutex_lock(&(pool->lock));

        /*==为真,队列已经满,调wait阻塞 */
        while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {
            pthread_cond_wait(&(pool->queue_not_full), & (pool->lock));
        }

        //为false,线程池不关闭,跳过
        if (pool->shutdown) {
            pthread_cond_broadcast(&(pool->queue_not_empty));
            pthread_mutex_unlock(&(pool->lock));
            return 0;
        }

        /*清空 工作线程 调用的回调函数 的参数arg */
        if (pool->task_queue[pool->queue_rear].arg != NULL) {
            pool->task_queue[pool->queue_rear].arg = NULL;
        }

        /*添加任务到任务队列里*/
        pool->task_queue[pool->queue rear].function = function;
        pool->task_queue[pool->queue rear].arg = arg;
        pool->queue rear = (pool->queue rear + 1) % pool->queue_max_size;  /*队尾指针移动,模拟环形*/
        pool->queue_size++;

        /*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/
        ptread_cond_signal(&(pool->queue_not_empty));
        pthread_mutex_unlock(&(pool->lock));

            return 0;
    }

    /* G */
    //销毁线程池
    int threadpool_destroy(threadpool_t * pool)
    {
        int i;
        if (pool == NULL) {
            return -1;
        }
        pool->shutdown = true;

        /*先销毁管理线程 */
        pthread_join(pool->adjust_tid, NULL);

        for (i = 0; i < pool->adjust_tid, NULL) {
            /*通知所有的空闲线程*/
            //这个销毁函数是在任务都被处理完了才会被调用,所以线程池里的线程都空闲,阻塞在wait等待被唤醒
            pthread_cond_broadcast(&(pool->queue_not_empty));
            //任务队列没任务,这时候还去唤醒正在等待任务的线程,欺骗它有任务将其唤醒,使其销毁 -- 在 分析D 的while(任务数 == 0)
            //的pthread_cond_wait -- 被唤醒
        }
        for (i = 0; i < pool->live_thr_num; i++) {
            pthread_join(pool->threads[i], NULL);
        }
        threadpool_free(pool);

        return 0;
    }
    //对锁和条件变量的销毁,释放线程池的空间等
    int threadpool free(threadpool_t * pool) {
        if (POOL == NULL) {
            return -1;
        }
        if (pool->task_queue) {
            free(pool->task_queue);
        }
        if (pool->threads) {
            free(pool->threads);
            pthread_mutex_lock(&(pool->lock));
            pthread_mutex_destroy(&(pool->lock));
            pthread_mutex_lock(&(pool->thread_counter));
            pthread_mutex_destroy(&(pool->thread_counter));
            pthread_cond_destroy(&(pool->queue_not_empty));
            pthread_cond_destroy(&(pool->queue_not_full));
        }
        free(pool);
        pool = NULL;
        return 0;
    }

    /* B */
    int main(void) {
        /*threadpool_t *threadpool_create(int min_thr _num, int max_ thr_num, int queue_max_size);*/
        threadpool_t* thp = threadpool_create(3, 100, 100); /*创建线程池,池里最小3个线程,最大100,队列最大100*/
        printf("pool inited");
        //int *num =(int *)malloc(sizeof(int)*20);
        int num[20],i;
        for (i = 0; i < 20; i++) {
            num[i] = i;
            printf("add task %d\n", i);
            /*int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)*/
            threadpool_add(thp,process, (void*)&num[i]); /* 向线程池中添加任务*/
        }
        sleep(10);   /*等子线程完成任务*/
        threadpool_destroy(thp);

        return 0;
    }

3.分析

A.线程池结构体pool

1.真正的线程池是pool->threads,是个存储各个线程pid的数组,pool其余的成员都是对线程池的限制完成

2.关于线程任务结构体:

  • typedef struct {
        void *(*function)(void *);  /*函数指针,回调函数 */
        void *arg;   /*上面函数的参数 */
    }threadpool_task_t;  /*各子线程任务结构体 */
    //pool中任务队列threadpool_task_t *task_queue的队员
  • function是函数指针,指向模拟处理任务业务的回调函数process,arg是要处理内容,是function的参数

3.任务队列threadpool_task_t *task_queue:

pool中有任务队列头尾的下标 -- queue_front和queue_rear,需要注意的是采用的是环形队列的方法,下标rear的元素是空的

有新任务到来,该下标处的元素置空再将新任务赋值上去比较妥当:

--- 分析 F pthread_add任务添加函数

4.pool->shutdown,为false表示线程池不关闭

B.main

1.创建线程

分析C:pool的初始化; -- threadpool_create函数

分析D:线程池最少线程的创建;-- C中调用pthread_create创建线程,调用回调函数threadpool_thread(D)

分析E:管理线程的创建 -- 一样

2.模拟客户端产生线程,将其写进pool的任务队列task_queue当中 -- threadpool_add函数

也需要加锁,防止在添加进任务、对pool的queue_size现有任务数等进行更新时,被线程取走了任务,导致记录任务数和现有任务数不同 -- 分析F

3.销毁线程池 -- G

C.pool的初始化

创建线程池初始化好相关属性:最小线程数、最大线程数、开阔一篇地址(数组)作为线程池 -- 存储子线程pid的数组、管理线程、锁和条件变量的初始化、任务队列的开辟、创建好最小线程 -- 处理任务的回调函数、最大队列数等

do while(0),并不是说只需要一次,只要有一个步骤出现问题,就会break跳出循环,这就进入下一轮循环,知道do while中各个步骤都没问题没有break就会跳出do while

1.pthread_create创建线程,调用线程的回调函数threadpool_thread -- 分析D

  • 刚创建线程池,任务队列也为空,线程池无法工作,释放锁阻塞等待 -- D

2.继续执行主线程的,创建管理线程 -- pool_adjust_tid:--分析E

pthread_create(&(pool->adjust_tid),NULLL,adjust_thread,(void *)pool);  //创建管理者线程
调用回调函数adjust_thread -- 分析E
负责对线程池进行增加或者减少线程
刚创建没任务,线程也不需要扩展或者减少,让其一直在那循环监控就绪

初始化完成

D.创建子线程回调函数

void *threadpool_thread(void *threadpool) -- 工作线程

C中pool的初始化创建子线程时会调用,参数传的是pool;以及管理线程感知到线程池数量不够时也会调用来创建线程

1.将pool结构体锁住

2.没有任务(queue_not_empty不满足,队列为空):解锁 -- pthread_cond_wait -- 阻塞等待条件满足 -- 任务到来,满足条件变量queue_not_empty,队列不为空 -- pool加锁,处理任务

3.如果是 C 刚初始化后的,队列还没有任务,就一直阻塞,先回到 C 主线程继续创建管理线程

4.如果初始化完成的pool,B F 中模拟客户端产生了任务添加进pool中的任务队列threadpool_task_t *task_queue,queue_size != 0,对任务进行处理

5.定义一个threadpool_task_t task存放从任务队列中取出来的任务,队列有空间了,就可以通知正阻塞在任务队列不为满的条件变量上的任务可以进入队列;执行任务 --> 给busy_thr_num忙线程数加锁,并且该数++ --> 表示正式工作的

6.调用执行任务函数task.function -- process,传参传task.arg

7.任务处理完成,对忙状态线程数加锁--后解锁

之所以要设置忙状态线程数,是因为管理线程对线程池进行扩容或是缩减需要有个参照;而加锁是为了多个线程恰巧同时处理完任务,对忙状态线程数进行--时,本来该-3,结果还没写进内存又被更改覆盖掉了,总共变成-1

//线程池中各个工作线程 -- 线程创建的回调函数
void *threadpool thread(void *threadpool){
    threadpool_t *pool=(threadpool t *)threadpool;
    threadpool_task_t task; //各个子线程的任务结构体:函数指针--回调函数;回调函数的参数arg

    while(true){
        /*Lock must be taken to wait on conditional variable */
        /*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/
        pthread_mutex_lock(&(pool->lock));

        /*  3 初始化--- queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上,将锁释放,等待任务到来;若有任务,跳过该while*/
        while((pool->queue_size==0)&&(!pool->shutdown)){
           printf("thread @x%x is waiting\n",(unsigned int)pthread_self());
           pthread _cond_wait(&(pool->queue_not_empty),&(pool->lock));
           //初始化完成后, F 的任务添加函数add被调用,任务添加,线程被唤醒,queue_size != 0,跳出循环

           /*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/
           //这个if语句就是用于销毁扩容的线程的,管理者线程就是欺骗这些空闲的线程将其唤醒,使其指向下面的代码,实现线程池搜身
           if(pool->wait_exit_thr_num >0){
               pool->wait_exit_thr_num--;
               /*如果线程池里线程个数大于最小值时可以结束当前线程*/
               if(pool->live_thr_num > pool->min_thr_num){
                   printf("thread 0x%x is exiting\n",(unsigned int)pthread self());
                   pool->live_thr_num--;
                   pthread_mutex_unlock(&(pool->lock));
                   pthread_exit(NULL);                   
               }
           }
        }

        /*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/
        if(pool->srutdown){
            pthread_mutex_unlock(&(pool->lock));
            printf("thread 0x%x is exiting\n",(unsigned int)pthread_self());
            pthread_detach(pthread_self());
            pthread_exit(NULL);  /*线程自行结束 */
        }

        /*从任务队列里获取任务,是一个出队操作*/
        task.function = pool->task queue[pool->queue front].function;
        task.arg = pool->task_queue[pool->queue_front].arg;
        pool->queue_front =(pool->queue_front + 1)% pool->queue _max_size;/*出队,模拟环形队列 */
        pool->queue size--;

        /*通知可以有新的任务添加进来*/
        pthread_cond_broadcast(&(pool->queue_not_full));

        /*任务取出后,立即将 线程池琐 释放*/
        pthread_mutex_unlock(&(pool->lock));

        /*执行任务*/
        printf("thread 0x%x start working\n",(unsigned int)pthread_self());
        pthread_mutex_lock(&(p0ol->thread_counter);  /*忙状态线程数变量琐*/
        pool->busy_thr_num++;   /*忙状态线程数+1*/
        pthread_mutex_unlock(&(pool->thread_counter));

        (*(task.function))(task.arg);  /*执行回调函数任务*/
        //task.function(task.arg);  /*执行回调函数任务*/

        /*任务结束处理*/
        printf("thread 0x%x end working\n",(unsigned int)pthread_self());
        pthread_mutex_lock(&(pool->thread counter));
        pool->busy_thr_num--;   /*处理掉一个任务,忙状态数线程数-1*/    
        pthread_mutex_unlock(&(pool->thread_counter));
    }
    pthread_exit(NULL);
}

E.管理线程回调函数adjust_thread

在C中,对线程池的创建初始化

1.C中调用pthread_create,创建管理线程,调用其回调函数adjust_thread

2.对pool结构体加锁,查看目前的任务数和线程数(不加锁的话,比如在查看live_thr_num时,刚好有线程加入对live_thr_num做修改,同时进行出现问题),查看完后解锁对忙着的线程pool->busy_thr_num加锁,查看忙着的线程数,解锁(忙着的线程数专门有一把自己的锁,对busy查看修改时,不影响live_thr_num和任务数,其它线程可在这时候对pool加锁查看live等属性)

3.定时对线程数量进行监控

4.线程数量不够,调用pthread_create,传入 D 创建子线程的回调函数threadpool_thread,为线程池pool->threads添加线程

5.空闲线程数量过多,对其进行销毁

6.每10秒循环一次进行监控,该管理线程没有阻塞

void *adjust_thread(void *threadpool){
    int i;
    threadpool_t *pool =(threadpool_t*)threadpool;
    while(!pool->shutdown){
        sleep(DEFAULT_TIME);      //定时对线程池管理

        pthread_mutex_lock(8(pool->lock));
        int queue_size = pool->queue_size;   //关注 任务数
        int live_thr_nun= pool->live_thr_num;  //存活 线程数
        pthread_mutex_unlock(&(pool->lock));

        pthread_mutex_lock(&(pool->thread counter));
        int busy_thr_num = pool->busy_thr_num;  //忙着的线程数
        pthread_mutex_unlock(&(pool->thread_counter));

        /*创建新线程 算法 : 任务数大于最小线程池个数,且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/
        if(queue size >= MIN_WAIT_TASK_NUM 8& live thr num < pool->max_thr_num){
            pthread mutex lock(&(pool->lock));
            int add = 0;
            /*一次增加 DEFAULT THREAD 个线程*/
            for(i=0;i < pool->max_thr_num && add < DEFAULT_THREAD_VARY 
                && pool->live_thr_num< pool->max_thr_num;i++){
                if(pool->threads[i]= 0 ||is_thread_alive(pool->threads[i])){
                    pthread_create(&(pool->threads[i]),NULL,threadpool_thread,(void*)pool);
                    add++;
                    pool->live_thr_num++;
                }
                pthread_mutex_unlock(&(pool->lock));
        }

            /*销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/
        if((busy_thr_num*2)<live_thr_num && live_thr_num> pool->min_thr_num){
            /*一次销毁DEFAULT_THREAD个线程,随機10個即可 */
            pthread_mutex_lock(&(pool->lock));
            pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;  /*要销毁的线程数 设置为10 */
            pthread mutex_unlock(&(pool->lock));

            for(i=0;i<DEFAULT_THREAD_VARY; i++){
                /*通知处在空闲状态的线程,他们会自行终止*/
                pthread_cond_signal(&(pool->queue_not_empty));
                //欺骗空闲的线程有任务了,将其唤醒
            }
        }        
    }
    return NULL
}

D中的回调函数部分代码:

F.模拟添加任务 -- main 3

typedef struct {
    void *(*function)(void *);  /*函数指针,回调函数 */
    void *arg;   /*上面函数的参数 */
}threadpool_task_t;  /*各子线程任务结构体 */
//pool中任务队列threadpool_task_t *task_queue的队员

main函数中调用该函数
/*int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)*/
threadpool_add(thp,process,(void*)&num[i]); /* 向线程池中添加任务*/
参1是pool
参2是处理任务的回调函数
参3存的是任务,是回调函数process的参数 -- 要处理的任务
参2和参3在add函数中会存进threadpool_task_t,然后存进pool->task_queue任务队列

1.函数process是模拟处理任务业务,每个线程通过调用该函数去处理任务,这里功能省略了

void *process(void *arg){
    printf("thread 0x%x working on taks %d/n",(usigned int)pthread_self(),(int) arg);
    sleep(1);
    printf("task %d is end \n",(int) arg);

    return NULL;
}

2.B main函数调用threadpool_add函数

3.对pool加锁,检查任务pool->queue_size是否满,满则调pthread_cond_wait阻塞,释放锁等待线程处理任务,一直while不断检查,知道有线程处理了任务,wait重新给pool上锁,继续执行代码

4.有空的空间,添加任务到任务队列

环形指针,rear处元素置空存放新来的任务

/*向线程池中 添加一个任务 */
//threadpool_add(thp,process,(void*)&num[i]); /*向线程池中添加任务 process:小写---->大写*/
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg){
    pthread mutex_lock(&(pool->lock));

    /*==为真,队列已经满,调wait阻塞 */
    while((pool->queue_size ==pool->queue_max_size)&&(!pool->shutdown)){
        pthread_cond_wait(&(pool->queue_not_full),&(pool->lock));
    }

    //为false,线程池不关闭,跳过
    if(pool->shutdown){
        pthread_cond_broadcast(&(pool->queue_not_empty));
        pthread_mutex_unlock(&(pool->lock));
        return 0;
    }

    /*清空 工作线程 调用的回调函数 的参数arg */
    if(pool->task_queue[pool->queue_rear].arg != NULL){
        pool->task_queue[pool->queue_rear].arg = NULL;
    }

    /*添加任务到任务队列里*/
    pool->task_queue[pool->queue rear].function = function;  //process -- 处理任务的回调函数赋值给任务队列
    pool->task_queue[pool->queue rear].arg = arg;  // 要处理的任务arg传给赋值任务队列
    pool->queue_rear =(pool->queue_rear + 1)% pool->queue_max_size;  /*队尾指针移动,模拟环形*/
    pool->queue_size++;  //向任务队列中添加了一个任务,记录队列现有任务数++


    /*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/
    ptread_cond_signal(&(pool->queue_not_empty));  //唤醒阻塞在队列为空的想要处理任务的线程 -- 初始化时会唤醒到
    pthread_mutex_unlock(&(pool->lock));  //添加任务完毕,释放锁,等某个线程抢到锁处理任务

    return 0;
}

G.销毁线程池

1.threadpool_destroy(threadpool_t *pool)

//销毁线程池
int threadpool_destroy(threadpool_t *pool)
{
    int i;
    if(pool == NULL){
        return -1;            
    }
    pool->shutdown = true;

    /*先销毁管理线程 */
    pthread_join(pool->adjust_tid,NULL);

    for(i = 0;i< pool->live_thr_num;i++){
        /*通知所有的空闲线程*/
        //这个销毁函数是在任务都被处理完了才会被调用,所以线程池里的线程都空闲,阻塞在wait等待被唤醒
        pthread_cond_broadcast(&(pool->queue_not_empty));
        //任务队列没任务,这时候还去唤醒正在等待任务的线程,欺骗它有任务将其唤醒,使其销毁 -- 在 分析D 的while(任务数 == 0)
        //的pthread_cond_wait -- 被唤醒
    }
    for(i = 0;i < pool->live_thr_num; i++){
        pthread_join(pool->threads[i],NULL);    
    }
    threadpool_free(pool);

    return 0;
}

13行 - 19行:

2.int threadpool_free(threadpool_t *pool):

//对锁和条件变量的销毁,释放线程池的空间等
int threadpool free(threadpool_t *pool){
    if(POOL == NULL){
        return -1;
    }
    if(pool->task_queue){
        free(pool->task_queue);
    }
    if(pool->threads){
        free(pool->threads);
        pthread_mutex_lock(&(pool->lock));
        pthread_mutex_destroy(&(pool->lock));
        pthread_mutex_lock(&(pool->thread_counter));
        pthread_mutex_destroy(&(pool->thread_counter));
        pthread_cond_destroy(&(pool->queue_not_empty));
        pthread_cond_destroy(&(pool->queue_not_full));
    }
    free(pool);
    pool = NULL;
    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2187636.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

linux学习--第七天(多路复用IO)

多路复用IO -阻塞IO与非阻塞IO -IO模型 IO的本质时基于操作系统接口来控制底层的硬件之间数据传输&#xff0c;并且在操作系统中实现了多种不同的IO方式&#xff08;模型&#xff09;比较常见的有下列三种&#xff1a; 1.阻塞型IO模型 2.非阻塞型IO模型 3.多路复用IO模型 -阻…

开源2+1链动模式AI智能名片O2O商城小程序源码:线下店立体连接的超强助力器

摘要&#xff1a;本文将为您揭示线下店立体连接的重大意义&#xff0c;您知道吗&#xff1f;线上越火&#xff0c;线下就得越深入经营。现代门店可不再只是卖东西的地儿&#xff0c;还得连接KOC呢&#xff01;咱们来看看门店要做的那些超重要的事儿&#xff0c;还有开源21链动模…

Authentication Lab | CVE-2019-7644 - JWT Signature Disclosure

关注这个靶场的其他相关笔记&#xff1a;Authentication Lab —— 靶场笔记合集-CSDN博客 0x01&#xff1a;JWT Signature Disclosure 前情提要 本关的考点是 JWT&#xff08;Json Web Token&#xff09;漏洞&#xff0c;JWT 是一个用于跨域认证的技术。如果你不了解 JWT&…

计算机视觉——图像修复综述篇

目录 1. Deterministic Image Inpainting 判别器图像修复 1.1. sigle-shot framework (1) Generators (2) training objects / Loss Functions 1.2. two-stage framework 2. Stochastic Image Inpainting 随机图像修复 2.1. VAE-based methods 2.2. GAN-based methods …

攻防世界----->easyre-153

做题笔记。 下载 查壳。 UPX&#xff0c;---脱壳。 32ida打开。 先运行一下&#xff1a; 查找字符校位。 管道父子&#xff1f;有点像此前做的那个进程互斥。。。 分析&#xff1a; 跟进lol &#xff1f; 查看汇编窗口看看。(因为一个函数只存在一个打印函数&#xff0c;就很…

集合框架01:集合的概念、Collection体系、Collection接口

1.集合的概念 集合是对象的容器&#xff0c;定义了多个对象进行操作的常用方法。可实现数组的功能。 集合和数组的区别&#xff1a; 1.数组长度固定&#xff0c;集合长度不固定&#xff1b; 2.数组可以存储基本类型和引用类型&#xff0c;集合只能存储引用类型&#xff1b; …

读数据湖仓06数据集成

1. 数据湖仓中的数据集成 1.1. 数据湖仓的总体目标是为每一个人提供支持&#xff0c;包括从普通职员到CEO 1.2. 有了作为基础设施的基础数据&#xff0c;企业等组织才能实现真正的数据驱动 1.3. 提供组织所需的数据&#xff0c;最关键的一环在于提供集成的数据基础 1.3.1. 只…

信息安全工程师(32)认证技术方法

前言 认证技术方法是用于验证用户、设备或系统身份的各种技术手段和方法&#xff0c;旨在确保只有经过验证的实体才能访问系统资源&#xff0c;从而保护数据和系统的安全。 一、常见认证技术方法 密码认证 描述&#xff1a;用户通过输入预先设置的密码进行身份验证。优点&#…

The 14th Jilin Provincial Collegiate Programming Contest

题目 #include <bits/stdc.h> using namespace std; #define int long long #define pb push_back #define fi first #define se second #define lson p << 1 #define rson p << 1 | 1 #define ll long long #define pii pair<int, int> #define ld lo…

C语言 | Leetcode C语言题解之第455题分发饼干

题目&#xff1a; 题解&#xff1a; int cmp(int* a, int* b) {return *a - *b; }int findContentChildren(int* g, int gSize, int* s, int sSize) {qsort(g, gSize, sizeof(int), cmp);qsort(s, sSize, sizeof(int), cmp);int m gSize, n sSize;int count 0;for (int i …

D26【python 接口自动化学习】- python 基础之判断与循环

day26 语句嵌套 学习日期&#xff1a;20241003 学习目标&#xff1a;判断与循环&#xfe63;-36 语句嵌套&#xff1a;如何处理多重嵌套的问题&#xff1f; 学习笔记&#xff1a; 语句嵌套的用途 在条件语句中使用另外一个条件语句 在循环中使用条件语句 多重循环 总结 1…

Authentication Lab | JWT None Algorithm

关注这个靶场的其他相关笔记&#xff1a;Authentication Lab —— 靶场笔记合集-CSDN博客 0x01&#xff1a;JWT None Algorithm 前情提要 本关的考点是 JWT&#xff08;Json Web Token&#xff09;漏洞&#xff0c;JWT 是一个用于跨域认证的技术。如果你不了解 JWT&#xff0c…

<<迷雾>> 第6章 加法机的诞生(3)--三比特加法电路 示例电路

用全加器组成一个三比特加法电路 info::操作说明 鼠标单击开关切换开合状态 primary::在线交互操作链接 https://cc.xiaogd.net/?startCircuitLinkhttps://book.xiaogd.net/cyjsjdmw-examples/assets/circuit/cyjsjdmw-ch03-02-3-bit-adder.txt 原图 加法机的简单图示 info::操…

Python案例--数字组合

在编程和数据处理中&#xff0c;我们经常需要从给定的元素中生成所有可能的组合。本文将通过一个简单的Python程序&#xff0c;展示如何生成由四个给定数字&#xff08;0-9&#xff09;组成的所有可能的无重复三位数组合。这可以应用于多种场景&#xff0c;如密码生成、数据校验…

【MySQL实战45讲6】全局锁和表锁

文章目录 全局锁表级锁 全局锁 顾名思义&#xff0c;全局锁就是对整个数据库实例加锁。MySQL提供了一个对全局读锁的方法&#xff0c;命令是Flush tables with read lock (FTWRL) 当需要让整个库处于只读状态的时候&#xff0c;可以使用这个命令&#xff0c;之后其他线程的以下…

计算机毕业设计python+spark知识图谱课程推荐系统 课程预测系统 课程大数据 课程数据分析 课程大屏 mooc慕课推荐系统 大数据毕业设计

《PythonSpark知识图谱课程推荐系统》开题报告 一、研究背景与意义 随着互联网技术的快速发展&#xff0c;在线教育平台已成为人们获取知识、提升技能的重要途径。然而&#xff0c;面对海量的课程资源&#xff0c;用户往往难以快速找到符合自己兴趣和需求的课程。传统的课程推…

Web安全 - 安全防御工具和体系构建

文章目录 安全标准和框架1. 国内安全标准&#xff1a;等级保护制度&#xff08;等保&#xff09;2. 国际安全标准&#xff1a;ISO27000系列3. NIST安全框架&#xff1a;IDPRR方法4. COBIT与ITIL框架 防火墙防火墙的基本作用防火墙的三种主要类型防火墙的防护能力防火墙的盲区 W…

【C++前缀和】3026. 最大好子数组和|1816

本文涉及的基础知识点 C算法&#xff1a;前缀和、前缀乘积、前缀异或的原理、源码及测试用例 包括课程视频 LeetCode3026. 最大好子数组和 给你一个长度为 n 的数组 nums 和一个 正 整数 k 。 如果 nums 的一个 子数组 中&#xff0c;第一个元素和最后一个元素 差的绝对值恰…

8643 简单选择排序

### 思路 简单选择排序是一种原地排序算法&#xff0c;通过在未排序部分中选择最小&#xff08;或最大&#xff09;元素并将其放置在已排序部分的末尾来进行排序。每次选择后输出当前排序结果。 ### 伪代码 1. 读取输入的待排序关键字个数n。 2. 读取n个待排序关键字并存储在数…

探索聚丙烯酸水凝胶,从制备到特性,再到3D打印实践

大家好&#xff01;今天我们来了解一种基于聚丙烯酸的自修复水凝胶——《Preparation and Characterization of Poly(Acrylic Acid)-Based Self-Healing Hydrogel for 3D Shape Fabrication via Extrusion-Based 3D Printing》发表于《Materials》。在材料科学领域&#xff0c;…