从头开始:构建一个基于C/C++的线程池

news2024/11/16 3:45:21

手搓线程池

  • 线程池工作原理和实现
    • 线程池工作原理
        • 1. 线程池的基本组成:
        • 2. 线程池的基本执行流程:
        • 3. 线程池的核心参数:
        • 4. 线程池的生命周期:
        • 5. 线程池的执行策略:
    • 相关知识点
      • 线程与进程的比较
      • 读写锁
      • 互斥锁
    • 基于C语言的线程池设计与实现
    • 基于C++的线程池设计与实现

线程池工作原理和实现

线程池工作原理

线程池(Thread Pool)是一种预先创建和管理一组工作线程的技术,用来优化并发任务的执行。通过复用这些线程来执行多个任务,线程池可以减少线程创建和销毁的开销,提高系统的性能和响应速度。

1. 线程池的基本组成:
  • 任务队列(Task Queue):存放待执行任务的队列。当有新的任务提交时,它会进入任务队列,等待可用线程来执行。
  • 工作线程(Worker Threads):线程池中的一组线程,用来处理任务队列中的任务。线程池启动时会创建一定数量的工作线程。
  • 线程池管理器(Thread Pool Manager):负责管理线程池的大小、任务分配、线程的创建和销毁。
2. 线程池的基本执行流程:
  1. 提交任务:用户向线程池提交任务,通常是实现了 RunnableCallable 接口的对象。任务被放入任务队列中。
  2. 任务分配:线程池中的某个空闲工作线程会从任务队列中取出一个任务,并开始执行。任务队列是线程安全的,确保多个线程可以同时取任务而不发生冲突。
  3. 任务执行:工作线程运行并执行任务中的代码。线程池中的工作线程是循环的,每个线程在完成一个任务后会继续取下一个任务。
  4. 复用线程:任务完成后,线程不会被销毁,而是回到线程池中成为“空闲线程”,等待下一个任务。
  5. 动态调整线程数量:如果任务数量激增,线程池可以根据策略(如扩展线程池的大小)创建新的线程来处理更多的任务;如果任务减少,线程池也可能会销毁一些线程以节省资源。
3. 线程池的核心参数:
  • 核心线程数(corePoolSize):线程池中保持活跃的核心线程数量,即使线程处于空闲状态也不会被回收。
  • 最大线程数(maximumPoolSize):线程池中允许的最大线程数量。当任务数量超过核心线程数时,线程池会根据需求创建更多的线程,直到达到最大线程数。
  • 任务队列(workQueue):用于存放等待执行的任务。如果当前所有线程都在忙碌,新的任务会被放入任务队列中。
  • 线程存活时间(keepAliveTime):线程池中超过核心线程数的空闲线程在等待新任务的最长等待时间,超过这个时间后将被销毁。
  • 线程工厂(ThreadFactory):用于创建新线程,方便自定义线程的属性,如线程的名称、优先级等。
  • 拒绝策略(RejectedExecutionHandler):当任务过多而无法处理时(任务队列满了且线程池的线程数已达到上限),线程池会执行拒绝策略,常见策略包括丢弃任务、抛出异常、或由调用者线程直接执行。
4. 线程池的生命周期:
  • 运行状态(RUNNING):线程池处于运行状态,可以接受任务并处理任务。
  • 关闭状态(SHUTDOWN):线程池不再接受新任务,但会继续处理已经提交的任务。
  • 停止状态(STOP):线程池不再接受任务,且会中断正在执行的任务。
  • 终止状态(TERMINATED):所有任务执行完毕,线程池中的线程全部销毁,线程池彻底关闭。
5. 线程池的执行策略:
  • 先使用核心线程:线程池优先利用核心线程来处理任务,只有在核心线程全部繁忙的情况下,才会将任务放入任务队列。
  • 任务队列满了,创建新线程:如果任务队列已满且所有核心线程都在工作,线程池会创建新的线程,直到达到 maximumPoolSize
  • 拒绝任务:如果任务队列满了,且线程池中的线程数量已经达到 maximumPoolSize,根据配置的拒绝策略处理新任务。

相关知识点

线程与进程的比较

  • 线程启动速度快,轻量级
  • 线程使用有一定难度,需要处理数据一致性问题
  • 同一线程共享的有堆、全局变量、静态变量、指针等,而独自占有栈
  • 线程是调度的基本单位(PC、状态码、通用寄存器、线程栈及栈指针);进程是拥有资源的基本单位(打开文件、堆、代码段等)
  • 一个进程内多个线程可以并发;多个进程可以并发
  • 拥有资源:线程不拥有系统资源,但一个进程的多个线程可以共享隶属进程的资源;进程是拥有资源的独立单位
  • 线程的系统开销小,线程创建销毁只需要处理PC值,状态码,通用寄存器值,线程栈及栈指针即可;进程创建和销毁需要重新分配及销毁task_struct结构。

读写锁

  • 多个读者可以同时进行读
  • 写者必须互斥(只允许一个写者写,也不能读者写者同时进行)
  • 写者优先于读者(一旦有写者,则后续读者必须等待,唤醒时优先考虑写者)

互斥锁

一次只能一个线程拥有互斥锁,其他线程只有等待。

互斥锁是在抢锁失败的情况下主动放弃CPU进入睡眠状态直到锁的状态改变时再唤醒,而操作系统负责线程调度,为了实现锁的状态发生改变时唤醒阻塞的线程或者进程,需要把锁交给操作系统管理,所以互斥锁在加锁操作时设计上下文的切换。互斥锁实际的效率还是可以让人接受的,加锁的时间大概100ns左右,而实际上互斥锁的一种可能的实现是先自旋一段时间,当自旋的时间超过阈值之后再将线程投入到睡眠中,因此在并发运算中使用互斥锁(每次占用锁的时间很短)的效果可能不亚于使用自旋锁。

互斥锁属于sleep-waiting类型的锁。例如在一个双核的机器上有两个线程A和B,它们分别运行在core 0和core 1上。假设线程A想要通过pthread_mutex_lock操作去得到一个临界区的锁,而此时这个锁正被线程B所持有,那么线程A就会被阻塞,此时会通过上下文切换将线程A置于等待队列中,此时core 0就可以运行其他的任务(如线程C)。

基于C语言的线程池设计与实现

任务队列

typedef struct Task
{
    void (*function) (void* arg); // void*是一个泛型,能够接收各种各样的数据类型
    void* arg;
}Task;

线程池定义

struct ThreadPool
{
    // 任务队列
    Task* taskQ; // 队列数组
    int queueCapacity;  // 容量
    int queueSize;      // 当前任务个数
    int queueFront;     // 队头,取数据
    int queueRear;      // 队尾,放数据

    pthread_t managerID;    // 管理者线程ID
    pthread_t *threadIDs;   // 工作的线程ID
    int minNum;             // 最小的线程数
    int maxNum;             // 最大的线程数
    int busyNum;            // 工作的线程个数
    int liveNum;            // 存活的线程个数
    int exitNum;            // 要杀死的线程个数
    pthread_mutex_t mutexpool;  // 锁住整个线程池
    pthread_mutex_t mutexBusy;  // 锁住busyNum变量
    pthread_cond_t notFull;     // 任务队列是否满了
    pthread_cond_t notEmpty;    // 任务队列是否空了

    int shutdown;               // 是否销毁线程池,销毁为1,不销毁为0
};

头文件声明

#ifndef _THREADPOOL_H
#define _THREADPOOL_H

typedef struct ThreadPool ThreadPool;
// 创建线程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int queueSize);

// 销毁线程池
int threadPoolDestory(ThreadPool* pool);

// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);

// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);

// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);

void* worker(void* arg);

void* manager(void* arg);

void threadExit(ThreadPool* pool);

#endif 

源文件定义

#include "threadpool.h"
#include<pthread.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<stdio.h>

const int NUMBER = 2;

// 任务结构体
typedef struct Task
{   
    // void*是一个泛型,能够接收各种各样的数据类型
    void (*function) (void* arg); // 函数指针
    void* arg;
}Task;


// 线程池结构体
struct ThreadPool
{
    // 任务队列
    Task* taskQ; // 队列数组
    int queueCapacity;  // 容量
    int queueSize;      // 当前任务个数
    int queueFront;     // 队头,取数据
    int queueRear;      // 队尾,放数据

    pthread_t managerID;    // 管理者线程ID
    pthread_t *threadIDs;   // 工作的线程ID
    int minNum;             // 最小的线程数
    int maxNum;             // 最大的线程数
    int busyNum;            // 工作的线程个数
    int liveNum;            // 存活的线程个数
    int exitNum;            // 要杀死的线程个数
    pthread_mutex_t mutexpool;  // 锁住整个线程池
    pthread_mutex_t mutexBusy;  // 锁住busyNum变量
    pthread_cond_t notFull;     // 任务队列是否满了,用于阻塞生产者
    pthread_cond_t notEmpty;    // 任务队列是否空了,用于阻塞消费者

    int shutdown;               // 是否销毁线程池,销毁为1,不销毁为0
};

ThreadPool* threadPoolCreate(int min, int max, int queueSize) {
    ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
    do {
        if(pool == NULL) {
            printf("malloc threadpool fail....\n");
            break;
        }
        pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
        if(pool->threadIDs == NULL) {
            printf("malloc threadIDs fail....\n");
            break;
        }
        memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
        pool->minNum = min;
        pool->maxNum = max;
        pool->busyNum = 0;
        pool->liveNum = min; // 和最小个数相等
        pool->exitNum = 0;

        if(pthread_mutex_init(&pool->mutexpool, NULL) != 0 ||
        pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
        pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
        pthread_cond_init(&pool->notFull, NULL) != 0
        ) {
            printf("mutex or condif init fail....\n");
            break;
        }

        // 任务队列
        pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
        pool->queueCapacity = queueSize;
        pool->queueSize = 0;
        pool->queueFront = 0;
        pool->queueRear = 0;

        pool->shutdown = 0;

        // 创建管理者线程和工作者线程
        pthread_create(&pool->managerID, NULL, manager, pool); // 第三个参数为管理者线程的任务函数
        for(int i = 0; i < min; i++) {
            pthread_create(&pool->threadIDs[i], NULL, worker, pool); // 第三个参数为工作的线程的任务函数
        }
        return pool;
    } while(0);

    // 释放资源
    if(pool->threadIDs) free(pool->threadIDs);
    if(pool->taskQ) free(pool->taskQ);
    if(pool) free(pool);

    return NULL;
}

int threadPoolDestory(ThreadPool* pool) {
    if(pool == NULL) {
        return -1;
    }

    // 关闭线程池
    pool->shutdown = 1;
    // 阻塞回收管理者线程
    pthread_join(pool->managerID, NULL);
    // 唤醒阻塞的消费者线程
    for(int i = 0; i < pool->liveNum; i++) {
        pthread_cond_signal(&pool->notEmpty);
    }
    // 释放堆内存
    if(pool->taskQ) {
        free(pool->taskQ);
    }
    if(pool->threadIDs) {
        free(pool->threadIDs);
    }
    
    pthread_mutex_destroy(&pool->mutexpool);
    pthread_mutex_destroy(&pool->mutexBusy);
    pthread_cond_destroy(&pool->notEmpty);
    pthread_cond_destroy(&pool->notFull);
    free(pool);
    pool = NULL;
    return 0;
}

void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg) {
    pthread_mutex_lock(&pool->mutexpool);
    while(pool->queueSize == pool->queueCapacity && !pool->shutdown) {
        // 阻塞生产者线程
        pthread_cond_wait(&pool->notFull, &pool->mutexpool);
    }
    if(pool->shutdown) {
        pthread_mutex_unlock(&pool->mutexpool);
        return;
    }
    // 添加任务
    pool->taskQ[pool->queueRear].function = func;
    pool->taskQ[pool->queueRear].arg = arg;
    pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
    pool->queueSize++;

    pthread_cond_signal(&pool->notEmpty); // 唤醒阻塞在条件变量里的工作的线程,即生产者生产后唤醒消费者
    pthread_mutex_unlock(&pool->mutexpool);
}

int threadPoolBusyNum(ThreadPool* pool) {
    pthread_mutex_lock(&pool->mutexBusy);
    int busyNum = pool->busyNum;
    pthread_mutex_unlock(&pool->mutexBusy);
    return busyNum;
}

int threadPoolAliveNum(ThreadPool* pool) {
    pthread_mutex_lock(&pool->mutexpool);
    int liveNum = pool->liveNum;
    pthread_mutex_unlock(&pool->mutexpool);
    return liveNum;
}

void* worker(void* arg) {
    ThreadPool* pool = (ThreadPool*)arg;
    while(1) {
        pthread_mutex_lock(&pool->mutexpool); // 访问线程池之前加锁
        // 当前任务队列是否为空
        while(pool->queueSize == 0 && !pool->shutdown) { // 任务队列为空并且线程池没有被关闭
            // 阻塞工作线程
            pthread_cond_wait(&pool->notEmpty, &pool->mutexpool);

            // 判断是不是要销毁线程
            if(pool->exitNum > 0) {
                pool->exitNum--;
                if(pool->liveNum > pool->minNum) {
                    pool->liveNum--;
                    pthread_mutex_unlock(&pool->mutexpool); // 阻塞的时候已经获得锁,已经锁上,如果不解开就会死锁
                    threadExit(pool);
                }
            }
        }

        // 判断线程池是否被关闭了
        if(pool->shutdown) {
            pthread_mutex_unlock(&pool->mutexpool); // 避免死锁
            threadExit(pool);
        }

        // 从任务队列中取出一个任务
        Task task;
        task.function = pool->taskQ[pool->queueFront].function;
        task.arg = pool->taskQ[pool->queueFront].arg;
        // 移动头结点
        pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity; // 循环队列
        pool->queueSize--;

        pthread_cond_signal(&pool->notFull); // 消费者消费完产品后唤醒生产者
        pthread_mutex_unlock(&pool->mutexpool); // 用完之后解锁

        printf("thread %ld start working...\n", pthread_self());
        pthread_mutex_lock(&pool->mutexBusy);
        pool->busyNum++;
        pthread_mutex_unlock(&pool->mutexBusy);
        task.function(task.arg);
        free(task.arg);
        task.arg = NULL;

        printf("thread %ld end working...\n", pthread_self());
        pthread_mutex_lock(&pool->mutexBusy);
        pool->busyNum--;
        pthread_mutex_unlock(&pool->mutexBusy);
    }
    return NULL;
}

void* manager(void* arg) {
    ThreadPool* pool = (ThreadPool*)arg;
    while(!pool->shutdown) {
        // 每隔三秒钟检测一次
        sleep(3);

        // 取出线程池中任务的数量和当前线程的数量
        pthread_mutex_lock(&pool->mutexpool);
        int queueSize = pool->queueSize;
        int liveNumber = pool->liveNum;
        pthread_mutex_unlock(&pool->mutexpool);

        // 取出忙的线程的数量
        pthread_mutex_lock(&pool->mutexBusy);
        int busyNumber = pool->busyNum;
        pthread_mutex_unlock(&pool->mutexBusy);

        // 添加线程
        // 任务的个数 大于 存活的线程个数 && 存活的线程数 小于 最大线程数
        if(queueSize > liveNumber && liveNumber < pool->maxNum) {
            pthread_mutex_lock(&pool->mutexpool);
            int count = 0;
            for(int i = 0; i < pool->maxNum && count < NUMBER && liveNumber < pool->maxNum; i++) {
                if(pool->threadIDs[i] == 0) {
                    pthread_create(&pool->threadIDs[i], NULL, worker, pool);
                    count++;
                    pool->liveNum++;
                }
            }
            pthread_mutex_unlock(&pool->mutexpool);
        }

        // 销毁线程
        // 忙的线程 * 2 小于 存活的线程数 && 存活的线程 大于 最小线程数
        // 之所以不用对pool->minNum加锁,是因为这个值是固定的,只需要读不需要写
        if(busyNumber * 2 < liveNumber && liveNumber > pool->minNum) {
            pthread_mutex_lock(&pool->mutexpool);
            pool->exitNum = NUMBER;
            pthread_mutex_unlock(&pool->mutexpool);
            // 让工作的线程自杀
            for(int i = 0; i < NUMBER; i++) {
                pthread_cond_signal(&pool->notEmpty);
            }
        }
    }
}

void threadExit(ThreadPool* pool) {
    pthread_t tid = pthread_self(); // 获得线程自身的ID。
    for(int i = 0; i < pool->maxNum; i++) {
        if(pool->threadIDs[i] == tid) { // tid对应的线程要退出了
            pool->threadIDs[i] = 0;
            printf("threadExit() called, %ld exiting...\n", tid);
            break;
        }
    }
    pthread_exit(NULL);
}

测试代码

#include<stdio.h>
#include "threadpool.h"
#include<pthread.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>

void taskFunc(void* arg) {
    int num = *(int*)arg;
    printf("thread %ld is working, number = %d\n", pthread_self(), num);
    sleep(1);
}

int main() {
    // 创建线程池
    ThreadPool* pool = threadPoolCreate(3, 10, 100);
    for(int i = 0; i < 100; i++) {
        int* num = (int*)malloc(sizeof(int));
        *num = i + 100;
        threadPoolAdd(pool, taskFunc, num);
    }

    sleep(30); // 等待子线程把任务处理完毕

    threadPoolDestory(pool);
    return 0;
}

image-20240922170244684

基于C++的线程池设计与实现

因为在C++中,delete task.arg时候,由于delete void*类型是有危险的,因为void*指针只占四个字节,因此有可能不能全部地被释放,为了知道在程序中void*实际上是什么类型,因此在C++中可以使用模板来解决这一问题,因为模板可以传递类型。

任务队列声明

#pragma once
#include<queue>
#include<pthread.h>

using callback = void (*) (void* arg);

// 任务结构体
template <typename T>
struct Task
{   
    Task<T>() {
        function = nullptr;
        arg = nullptr;
    }
    Task<T>(callback f, void* arg) {
        this->arg = (T*)arg;
        function = f;
    }
    callback function; // 函数指针
    T* arg;
};

template <typename T>
class TaskQueue
{
private:
    std::queue <Task<T>> m_taskQ;
    pthread_mutex_t m_mutex; // 互斥锁
public:
    TaskQueue(/* args */);
    ~TaskQueue();

    // 添加任务
    void addTask(Task<T> task);
    void addTask(callback f, void* arg);
    // 取出一个任务
    Task<T> takeTask();
    // 获取当前任务的个数
    inline size_t taskNumber() { // 没有if判断什么的,比较简单的直接写成内联函数比较好
        return m_taskQ.size();
    }
};

任务队列定义

#include "TaskQueue.h"


template <typename T>
TaskQueue<T>::TaskQueue(/* args */)
{
    pthread_mutex_init(&m_mutex, NULL);
}

template <typename T>
TaskQueue<T>::~TaskQueue()
{
    pthread_mutex_destroy(&m_mutex);
}

template <typename T>
void TaskQueue<T>::addTask(Task<T> task)
{
    pthread_mutex_lock(&m_mutex);
    m_taskQ.push(task);
    pthread_mutex_unlock(&m_mutex);
}

template <typename T>
void TaskQueue<T>::addTask(callback f, void* arg)
{
    pthread_mutex_lock(&m_mutex);
    m_taskQ.push(Task<T>(f, arg));
    pthread_mutex_unlock(&m_mutex);
}

template <typename T>
Task<T> TaskQueue<T>::takeTask() 
{
    Task<T> task;
    pthread_mutex_lock(&m_mutex);
    if(!m_taskQ.empty()) {
        task = m_taskQ.front();
        m_taskQ.pop();
    }
    pthread_mutex_unlock(&m_mutex);
    return task;
}

线程池声明

#pragma once
#include "TaskQueue.h"
#include "TaskQueue.cpp"

template <typename T>
class ThreadPool
{
public:
    ThreadPool(int min, int max);
    ~ThreadPool();

    // 添加任务
    void addTask(Task<T> task);
    // 获取忙线程的个数
    int getBusyNumber();
    // 获取活着的线程个数
    int getAliveNumber();

private:
    // 工作的线程的任务函数
    static void* worker(void* arg);
    // 管理者线程的任务函数
    static void* manager(void* arg);
    void threadExit();

    static const int NUMBER = 2;

    pthread_mutex_t mutexPool;
    pthread_cond_t notEmpty;
    pthread_t* threadIDs;
    pthread_t managerID;
    TaskQueue<T>* taskQ;
    int minNum;
    int maxNum;
    int busyNum;
    int liveNum;
    int exitNum;
    bool shutdown = false;
};

线程池定义

#include "ThreadPool.h"
#include <iostream>
#include <string.h>
#include <string>
#include <unistd.h>

using namespace std;

template <typename T>
ThreadPool<T>::ThreadPool(int min, int max) {
    // 实例化任务队列
    do {
        taskQ = new TaskQueue<T>;
        if(taskQ == nullptr) {
            cout << "malloc threadIDs fail...\n";
            break;
        }

        threadIDs = new pthread_t[max];
        if(threadIDs == nullptr) {
            cout << "malloc threadIDs fail....\n";
            break;
        }
        memset(threadIDs, 0, sizeof(pthread_t) * max);
        minNum = min;
        maxNum = max;
        busyNum = 0;
        liveNum = min; // 和最小个数相等
        exitNum = 0;

        if(pthread_mutex_init(&mutexPool, NULL) != 0 ||
        pthread_cond_init(&notEmpty, NULL) != 0
        ) {
            cout << "mutex or condif init fail....\n";
            break;
        }

        shutdown = false;
        
        /*
        为什么要把this传给manager呢?
        因为manager是一个静态方法,静态方法它只能访问类里边的静态成员变量,
        它不能访问类的非静态成员变量。
        因此如果我们想要访问这些非静态成员变量,就必须要给这个静态方法传进去一个
        实例化对象,通过传进去的这个实例化对象来访问里边的非静态成员函数和变量
        */
        // 创建管理者线程和工作者线程
        pthread_create(&managerID, NULL, manager, this); // 第三个参数为管理者线程的任务函数
        for(int i = 0; i < min; i++) {
            pthread_create(&threadIDs[i], NULL, worker, this); // 第三个参数为工作的线程的任务函数
        }
        return;
    } while(0);

    // 释放资源
    if(threadIDs) delete []threadIDs;
    if(taskQ) delete taskQ;
}

template <typename T>
ThreadPool<T>::~ThreadPool() {

    // 关闭线程池
    shutdown = true;
    // 阻塞回收管理者线程
    pthread_join(managerID, NULL);
    // 唤醒阻塞的消费者线程
    for(int i = 0; i < liveNum; i++) { // 因为到了最后没有任务了,所以活着的线程都需要关闭
        pthread_cond_signal(&notEmpty);
    }
    // 释放堆内存
    if(taskQ) {
        delete taskQ;
    }
    if(threadIDs) {
        delete []threadIDs;
    }
    
    pthread_mutex_destroy(&mutexPool);
    pthread_cond_destroy(&notEmpty);
}

template <typename T>
void ThreadPool<T>::addTask(Task<T> task) {
    if(shutdown) {
        return;
    }
    // 添加任务
    taskQ->addTask(task);

    pthread_cond_signal(&notEmpty); // 唤醒阻塞在条件变量里的工作的线程,即生产者生产后唤醒消费者
}

template <typename T>
int ThreadPool<T>::getBusyNumber() {
    pthread_mutex_lock(&mutexPool);
    int busyNum = this->busyNum;
    pthread_mutex_unlock(&mutexPool);
    return busyNum;
}

template <typename T>
int ThreadPool<T>::getAliveNumber() {
    pthread_mutex_lock(&mutexPool);
    int liveNum = this->liveNum;
    pthread_mutex_unlock(&mutexPool);
    return liveNum;
}

template <typename T>
void* ThreadPool<T>::worker(void* arg) {
    ThreadPool* pool = static_cast<ThreadPool*>(arg); // static_cast相当于c里面的强制类型转换
    while(1) {
        pthread_mutex_lock(&pool->mutexPool); // 访问线程池之前加锁
        // 当前任务队列是否为空
        while(pool->taskQ->taskNumber() == 0 && !pool->shutdown) { // 任务队列为空并且线程池没有被关闭
            // 阻塞工作线程
            pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);

            // 判断是不是要销毁线程
            if(pool->exitNum > 0) {
                pool->exitNum--;
                if(pool->liveNum > pool->minNum) {
                    pool->liveNum--;
                    pthread_mutex_unlock(&pool->mutexPool); // 阻塞的时候已经获得锁,已经锁上,如果不解开就会死锁
                    pool->threadExit();
                }
            }
        }

        // 判断线程池是否被关闭了
        if(pool->shutdown) {
            pthread_mutex_unlock(&pool->mutexPool); // 避免死锁
            pool->threadExit();
        }

        // 从任务队列中取出一个任务
        Task<T> task = pool->taskQ->takeTask();

        pool->busyNum++;
        // 因为任务队列可以无限大,所以生产者可以不用唤醒
        pthread_mutex_unlock(&pool->mutexPool); // 用完之后解锁

        printf("thread %ld start working...\n", pthread_self());
        task.function(task.arg);
        delete task.arg; // 
        task.arg = nullptr;

        printf("thread %ld end working...\n", pthread_self());

        pthread_mutex_lock(&pool->mutexPool);
        pool->busyNum--;
        pthread_mutex_unlock(&pool->mutexPool);
    }
    return NULL;
}

template <typename T>
void* ThreadPool<T>::manager(void* arg) {
    ThreadPool* pool = static_cast<ThreadPool*>(arg);
    while(!pool->shutdown) {
        // 每隔三秒钟检测一次
        sleep(3);

        // 取出线程池中任务的数量和当前线程的数量
        pthread_mutex_lock(&pool->mutexPool);
        int queueSize = pool->taskQ->taskNumber();
        int liveNumber = pool->liveNum;
        int busyNumber = pool->busyNum; // 取出忙的线程的数量
        pthread_mutex_unlock(&pool->mutexPool);
        

        // 添加线程
        // 任务的个数 大于 存活的线程个数 && 存活的线程数 小于 最大线程数
        if(queueSize > liveNumber && liveNumber < pool->maxNum) {
            pthread_mutex_lock(&pool->mutexPool);
            int count = 0;
            for(int i = 0; i < pool->maxNum && count < NUMBER && liveNumber < pool->maxNum; i++) {
                if(pool->threadIDs[i] == 0) {
                    pthread_create(&pool->threadIDs[i], NULL, worker, pool);
                    count++;
                    pool->liveNum++;
                }
            }
            pthread_mutex_unlock(&pool->mutexPool);
        }

        // 销毁线程
        // 忙的线程 * 2 小于 存活的线程数 && 存活的线程 大于 最小线程数
        // 之所以不用对pool->minNum加锁,是因为这个值是固定的,只需要读不需要写
        if(busyNumber * 2 < liveNumber && liveNumber > pool->minNum) {
            pthread_mutex_lock(&pool->mutexPool);
            pool->exitNum = NUMBER;
            pthread_mutex_unlock(&pool->mutexPool);
            // 让工作的线程自杀
            for(int i = 0; i < NUMBER; i++) {
                // 唤醒已经被阻塞的工作的线程,因为工作的线程无事可做,肯定是被阻塞在条件变量中
                pthread_cond_signal(&pool->notEmpty); 
            }
        }
    }
}

template <typename T>
void ThreadPool<T>::threadExit() {
    pthread_t tid = pthread_self(); // 获得当前线程的线程ID。
    for(int i = 0; i < maxNum; i++) {
        if(threadIDs[i] == tid) { // tid对应的线程要退出了
            threadIDs[i] = 0;
            printf("threadExit() called, %ld exiting...\n", tid);
            break;
        }
    }
    pthread_exit(NULL);
}

测试

#include "ThreadPool.h"
#include "ThreadPool.cpp"
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>

void taskFunc(void* arg) {
    int num = *(int*)arg;
    printf("thread %ld is working, number = %d\n", pthread_self(), num);
    sleep(1);
}

int main() {
    // 创建线程池
    ThreadPool<int> pool(3, 10);
    for(int i = 0; i < 100; i++) {
        int* num = new int(i + 100);
        pool.addTask(Task<int>(taskFunc, num));
    }

    sleep(20); // 等待子线程把任务处理完毕

    return 0;
}

g++ main.cpp -o main -lpthread

image-20240923173739851

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2163199.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【云原生安全篇】Trivy助力离线Harbor漏洞扫描实践

【云原生安全篇】Trivy助力离线Harbor漏洞扫描实践 目录 1 概念 1.1 为什么需要离线漏洞扫描1.2 Trivy和Harbor 简介1.3 实现离线漏洞扫描的技术方案 2 实践&#xff1a;Trivy 为Harbor提供离线漏洞扫描 2.1 环境准备2.2 安装Trivy作为数据库离线包下载代理 2.2.1 通过包管理…

MySQL_连接查询

课 程 推 荐我 的 个 人 主 页&#xff1a;&#x1f449;&#x1f449; 失心疯的个人主页 &#x1f448;&#x1f448;入 门 教 程 推 荐 &#xff1a;&#x1f449;&#x1f449; Python零基础入门教程合集 &#x1f448;&#x1f448;虚 拟 环 境 搭 建 &#xff1a;&#x1…

【大数据】数据中台怎么样助力企业创新和客户实践

在当今数字化时代&#xff0c;数据成为了企业竞争的关键因素。企业拥有大量的数据&#xff0c;但如何高效地利用这些数据&#xff0c;实现创新和提升客户体验&#xff0c;成为了一项重要的挑战。数据中台作为一种重要的数据管理和分析工具&#xff0c;发挥着关键的作用。本文将…

Maven 学习整理

1. Maven 简介 Maven 是 Apache 基金会推出的一个用于管理和构建 Java 项目的工具。它基于项目对象模型 (Project Object Model , 简 称: POM) 的概念&#xff0c;通过描述项目的依赖、结构、生命周期等&#xff0c;简化项目管理。 官网&#xff1a; https://maven.apache.org…

Spring、SpringBoot 框架功能学习

目录 一. Spring核心功能 二. Spring与SpringBoot区别 三. Spring与SpringMVC区别 四. SpringBoot与SpringCloud区别 五. 微服务组件 一. Spring核心功能 依赖注入&#xff08;DI&#xff09;&#xff1a;Spring的核心功能是通过依赖注入来管理对象之间的依赖关系。依赖注…

第L4周:机器学习-KNN总结-分类

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 概念&#xff1a; 在第L4周&#xff1a;机器学习-K-邻近算法模型&#xff08;KNN&#xff09;-CSDN博客中学习了KNN的基本概念&#xff0c;本次主要加深印象&a…

scrapy 爬取微博(四)【最新超详细解析】: 设计篇

一、功能设计 开始开发之前我们先对本文的scrapy微博爬虫工程进行一个功能的设计&#xff0c;包含的功能模块如下&#xff1a; 功能模块具体描述微博文章爬取根据关键词、时间范围等参数爬取微博文章&#xff0c;获取用户名、ID、微博mid、微博内容、点赞、转发、评论等数据微…

全国各省市生产总值指数-工业增加值指数(1999-2020年)

工业增加值指的是工业企业在一定时期内通过生产活动创造的新增价值&#xff0c;它等于工业总产值减去工业中间投入的差额。这一指标的计算可以采用生产法和收入法两种方式。生产法通过计算工业总产值与中间消耗的差额来得到&#xff0c;而收入法则将工业增加值视为固定资产折旧…

HarmonyOS Next(纯血鸿蒙)它到底像谁

前言 24年的第1天有写过一篇关于鸿蒙的文章&#xff1a;不吹不黑&#xff0c;辩证看待开发者是否需要入坑鸿蒙 后续再也没有写关于鸿蒙的文章。 没错&#xff0c;我确实入坑了鸿蒙&#xff0c;并且成功上架了几款App和元服务&#xff0c;虽然当前的用户量还比较少&#xff0c…

微信小程序——引入 iconfont 矢量图标,如何使用引用阿里巴巴矢量图标

本文介绍如何在小程序中加入图标&#xff0c;效果如下图&#xff1a; 1、访部iconfont-阿里巴巴矢量图标库 找到需要的图标&#xff0c;然后添加入库 将增加好的图标添加到项目中 2、点击更新生成代码 生成后如下图 3、打开生成的css样式文件 4、在小程序中新建/static/iconfon…

利士策分享,如何在有限的时间内过上富足的生活?

利士策分享&#xff0c;如何在有限的时间内过上富足的生活&#xff1f; 在快节奏的现代生活中&#xff0c;追求富足不仅仅是物质上的丰盈&#xff0c;更是心灵的满足与生活的平衡。 如何在有限的时间内实现这一目标&#xff0c;是许多人心中的疑问。 以下是一些实用建议&#…

Ubuntu 20.04 内核升级后网络丢失问题的解决过程

在 Ubuntu 系统中&#xff0c;内核升级是一个常见的操作&#xff0c;旨在提升系统性能、安全性和兼容性。然而&#xff0c;有时这一操作可能会带来一些意外的副作用&#xff0c;比如导致网络功能的丧失。 本人本来是想更新 Nvidia 显卡的驱动&#xff0c;使用 ubuntu-drivers …

postman中使用Pre-request Script

一、get方法 get请求时 &#xff0c;有多个params&#xff0c;并且有一个参数为sign&#xff0c;这个参数是有其他params拼接之后md5加密得到的&#xff0c;如何通过js语句获取params参数并生成sign。 const CryptoJS require(crypto-js); // 引入 CryptoJS 库进行 MD5 加密…

安卓数据存储——SQLite

一、SQLite数据库 创建表 CREATE TABLE IF NOT EXISTS user_info (_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,name VARCHAR NOT NULL,age INTEGER NOT NULL,height LONG NOT NULL,weight FLOAT NOT NULL);注&#xff1a; IF NOT EXISTS&#xff1a;如果该表不存在则创…

Docker更换阿里容器镜像源

以Mac为例&#xff0c; 一、获取阿里容器镜像加速器地址 访问阿里云官网https://cn.aliyun.com/ 登录阿里云&#xff0c;没有账号就注册一个 登录完成后在搜索框搜索&#xff0c;容器镜像服务&#xff0c;并打开 点击管理控制台&#xff0c;进入管理控制台 左侧点击镜像加速…

ubuntu重新安装clickhouse

1.卸载clickhouse 关闭原来的clickhouse sudo systemctl stop clickhouse-server 查看关闭clickhouse是否成功 sudo systemctl status clickhouse-server 备份配置文件 /etc/clickhouse-server/user.xml /etc/clickhouse-server/config.d/metrika.xml /etc/clickhouse…

蚂蚁Raft一致性算法库SOFAJRaft深入分析

大家好&#xff0c;我是 V 哥&#xff0c;SOFAJRaft 是蚂蚁金服开源的一个基于 Raft 共识算法的 Java 实现&#xff0c;它特别适合高负载、低延迟的分布式系统场景。SOFAJRaft 支持 Multi-Raft-Group&#xff0c;能够同时处理多个 Raft 集群&#xff0c;具有扩展性和强一致性保…

实验室ICPR 2024论文分享┆FPMT: 基于增强型半监督模型的交通事件检测(含详细视频解读)

目录 论文分享简介 1. 会议介绍 2. 研究背景及主要贡献 3. 方法 4. 实验 5. 结论 6. 论文介绍视频 论文分享简介 本推文详细介绍了一篇实验室的最新论文成果《FPMT: Enhanced Semi-Supervised Model for Traffic Incident Detection》&#xff0c;该论文已被第27届国际…

尚硅谷———-乐(智)尚代驾~~--------Day5----司机认证篇~

前言&#xff1a; Hello亲爱的uu们&#xff0c;在读过了一个愉快的周末后&#xff08;摸鱼了一会&#xff09;&#xff0c;我又回来更新啦&#xff0c;感谢uu们的阅读&#xff0c;话不多说~ 司机认证 当司机点击开始接单的时候&#xff0c;会先判断该司机有没有通过认证&…

关于PCA的一份介绍

在这篇文章中&#xff0c;我将介绍机器学习中的一种无监督学习算法——PCA&#xff0c;因为它主要有两种用途&#xff0c;即降维与特征提取&#xff0c;所以我将将围绕这两种用途来介绍它&#xff0c;包括基本概念&#xff0c;应用与代码实践。 一、 PCA 1.1 概念 PCA&#…