手搓线程池
- 线程池工作原理和实现
- 线程池工作原理
- 1. 线程池的基本组成:
- 2. 线程池的基本执行流程:
- 3. 线程池的核心参数:
- 4. 线程池的生命周期:
- 5. 线程池的执行策略:
- 相关知识点
- 线程与进程的比较
- 读写锁
- 互斥锁
- 基于C语言的线程池设计与实现
- 基于C++的线程池设计与实现
线程池工作原理和实现
线程池工作原理
线程池(Thread Pool)是一种预先创建和管理一组工作线程的技术,用来优化并发任务的执行。通过复用这些线程来执行多个任务,线程池可以减少线程创建和销毁的开销,提高系统的性能和响应速度。
1. 线程池的基本组成:
- 任务队列(Task Queue):存放待执行任务的队列。当有新的任务提交时,它会进入任务队列,等待可用线程来执行。
- 工作线程(Worker Threads):线程池中的一组线程,用来处理任务队列中的任务。线程池启动时会创建一定数量的工作线程。
- 线程池管理器(Thread Pool Manager):负责管理线程池的大小、任务分配、线程的创建和销毁。
2. 线程池的基本执行流程:
- 提交任务:用户向线程池提交任务,通常是实现了
Runnable
或Callable
接口的对象。任务被放入任务队列中。 - 任务分配:线程池中的某个空闲工作线程会从任务队列中取出一个任务,并开始执行。任务队列是线程安全的,确保多个线程可以同时取任务而不发生冲突。
- 任务执行:工作线程运行并执行任务中的代码。线程池中的工作线程是循环的,每个线程在完成一个任务后会继续取下一个任务。
- 复用线程:任务完成后,线程不会被销毁,而是回到线程池中成为“空闲线程”,等待下一个任务。
- 动态调整线程数量:如果任务数量激增,线程池可以根据策略(如扩展线程池的大小)创建新的线程来处理更多的任务;如果任务减少,线程池也可能会销毁一些线程以节省资源。
3. 线程池的核心参数:
- 核心线程数(corePoolSize):线程池中保持活跃的核心线程数量,即使线程处于空闲状态也不会被回收。
- 最大线程数(maximumPoolSize):线程池中允许的最大线程数量。当任务数量超过核心线程数时,线程池会根据需求创建更多的线程,直到达到最大线程数。
- 任务队列(workQueue):用于存放等待执行的任务。如果当前所有线程都在忙碌,新的任务会被放入任务队列中。
- 线程存活时间(keepAliveTime):线程池中超过核心线程数的空闲线程在等待新任务的最长等待时间,超过这个时间后将被销毁。
- 线程工厂(ThreadFactory):用于创建新线程,方便自定义线程的属性,如线程的名称、优先级等。
- 拒绝策略(RejectedExecutionHandler):当任务过多而无法处理时(任务队列满了且线程池的线程数已达到上限),线程池会执行拒绝策略,常见策略包括丢弃任务、抛出异常、或由调用者线程直接执行。
4. 线程池的生命周期:
- 运行状态(RUNNING):线程池处于运行状态,可以接受任务并处理任务。
- 关闭状态(SHUTDOWN):线程池不再接受新任务,但会继续处理已经提交的任务。
- 停止状态(STOP):线程池不再接受任务,且会中断正在执行的任务。
- 终止状态(TERMINATED):所有任务执行完毕,线程池中的线程全部销毁,线程池彻底关闭。
5. 线程池的执行策略:
- 先使用核心线程:线程池优先利用核心线程来处理任务,只有在核心线程全部繁忙的情况下,才会将任务放入任务队列。
- 任务队列满了,创建新线程:如果任务队列已满且所有核心线程都在工作,线程池会创建新的线程,直到达到
maximumPoolSize
。 - 拒绝任务:如果任务队列满了,且线程池中的线程数量已经达到
maximumPoolSize
,根据配置的拒绝策略处理新任务。
相关知识点
线程与进程的比较
- 线程启动速度快,轻量级
- 线程使用有一定难度,需要处理数据一致性问题
- 同一线程共享的有堆、全局变量、静态变量、指针等,而独自占有栈
- 线程是调度的基本单位(PC、状态码、通用寄存器、线程栈及栈指针);进程是拥有资源的基本单位(打开文件、堆、代码段等)
- 一个进程内多个线程可以并发;多个进程可以并发
- 拥有资源:线程不拥有系统资源,但一个进程的多个线程可以共享隶属进程的资源;进程是拥有资源的独立单位
- 线程的系统开销小,线程创建销毁只需要处理PC值,状态码,通用寄存器值,线程栈及栈指针即可;进程创建和销毁需要重新分配及销毁task_struct结构。
读写锁
- 多个读者可以同时进行读
- 写者必须互斥(只允许一个写者写,也不能读者写者同时进行)
- 写者优先于读者(一旦有写者,则后续读者必须等待,唤醒时优先考虑写者)
互斥锁
一次只能一个线程拥有互斥锁,其他线程只有等待。
互斥锁是在抢锁失败的情况下主动放弃CPU进入睡眠状态直到锁的状态改变时再唤醒,而操作系统负责线程调度,为了实现锁的状态发生改变时唤醒阻塞的线程或者进程,需要把锁交给操作系统管理,所以互斥锁在加锁操作时设计上下文的切换。互斥锁实际的效率还是可以让人接受的,加锁的时间大概100ns左右,而实际上互斥锁的一种可能的实现是先自旋一段时间,当自旋的时间超过阈值之后再将线程投入到睡眠中,因此在并发运算中使用互斥锁(每次占用锁的时间很短)的效果可能不亚于使用自旋锁。
互斥锁属于sleep-waiting
类型的锁。例如在一个双核的机器上有两个线程A和B,它们分别运行在core 0和core 1上。假设线程A想要通过pthread_mutex_lock
操作去得到一个临界区的锁,而此时这个锁正被线程B所持有,那么线程A就会被阻塞,此时会通过上下文切换将线程A置于等待队列中,此时core 0就可以运行其他的任务(如线程C)。
基于C语言的线程池设计与实现
任务队列
typedef struct Task
{
void (*function) (void* arg); // void*是一个泛型,能够接收各种各样的数据类型
void* arg;
}Task;
线程池定义
struct ThreadPool
{
// 任务队列
Task* taskQ; // 队列数组
int queueCapacity; // 容量
int queueSize; // 当前任务个数
int queueFront; // 队头,取数据
int queueRear; // 队尾,放数据
pthread_t managerID; // 管理者线程ID
pthread_t *threadIDs; // 工作的线程ID
int minNum; // 最小的线程数
int maxNum; // 最大的线程数
int busyNum; // 工作的线程个数
int liveNum; // 存活的线程个数
int exitNum; // 要杀死的线程个数
pthread_mutex_t mutexpool; // 锁住整个线程池
pthread_mutex_t mutexBusy; // 锁住busyNum变量
pthread_cond_t notFull; // 任务队列是否满了
pthread_cond_t notEmpty; // 任务队列是否空了
int shutdown; // 是否销毁线程池,销毁为1,不销毁为0
};
头文件声明
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
typedef struct ThreadPool ThreadPool;
// 创建线程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int queueSize);
// 销毁线程池
int threadPoolDestory(ThreadPool* pool);
// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);
// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);
void* worker(void* arg);
void* manager(void* arg);
void threadExit(ThreadPool* pool);
#endif
源文件定义
#include "threadpool.h"
#include<pthread.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<stdio.h>
const int NUMBER = 2;
// 任务结构体
typedef struct Task
{
// void*是一个泛型,能够接收各种各样的数据类型
void (*function) (void* arg); // 函数指针
void* arg;
}Task;
// 线程池结构体
struct ThreadPool
{
// 任务队列
Task* taskQ; // 队列数组
int queueCapacity; // 容量
int queueSize; // 当前任务个数
int queueFront; // 队头,取数据
int queueRear; // 队尾,放数据
pthread_t managerID; // 管理者线程ID
pthread_t *threadIDs; // 工作的线程ID
int minNum; // 最小的线程数
int maxNum; // 最大的线程数
int busyNum; // 工作的线程个数
int liveNum; // 存活的线程个数
int exitNum; // 要杀死的线程个数
pthread_mutex_t mutexpool; // 锁住整个线程池
pthread_mutex_t mutexBusy; // 锁住busyNum变量
pthread_cond_t notFull; // 任务队列是否满了,用于阻塞生产者
pthread_cond_t notEmpty; // 任务队列是否空了,用于阻塞消费者
int shutdown; // 是否销毁线程池,销毁为1,不销毁为0
};
ThreadPool* threadPoolCreate(int min, int max, int queueSize) {
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do {
if(pool == NULL) {
printf("malloc threadpool fail....\n");
break;
}
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
if(pool->threadIDs == NULL) {
printf("malloc threadIDs fail....\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min; // 和最小个数相等
pool->exitNum = 0;
if(pthread_mutex_init(&pool->mutexpool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0
) {
printf("mutex or condif init fail....\n");
break;
}
// 任务队列
pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
pool->queueCapacity = queueSize;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;
pool->shutdown = 0;
// 创建管理者线程和工作者线程
pthread_create(&pool->managerID, NULL, manager, pool); // 第三个参数为管理者线程的任务函数
for(int i = 0; i < min; i++) {
pthread_create(&pool->threadIDs[i], NULL, worker, pool); // 第三个参数为工作的线程的任务函数
}
return pool;
} while(0);
// 释放资源
if(pool->threadIDs) free(pool->threadIDs);
if(pool->taskQ) free(pool->taskQ);
if(pool) free(pool);
return NULL;
}
int threadPoolDestory(ThreadPool* pool) {
if(pool == NULL) {
return -1;
}
// 关闭线程池
pool->shutdown = 1;
// 阻塞回收管理者线程
pthread_join(pool->managerID, NULL);
// 唤醒阻塞的消费者线程
for(int i = 0; i < pool->liveNum; i++) {
pthread_cond_signal(&pool->notEmpty);
}
// 释放堆内存
if(pool->taskQ) {
free(pool->taskQ);
}
if(pool->threadIDs) {
free(pool->threadIDs);
}
pthread_mutex_destroy(&pool->mutexpool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
free(pool);
pool = NULL;
return 0;
}
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg) {
pthread_mutex_lock(&pool->mutexpool);
while(pool->queueSize == pool->queueCapacity && !pool->shutdown) {
// 阻塞生产者线程
pthread_cond_wait(&pool->notFull, &pool->mutexpool);
}
if(pool->shutdown) {
pthread_mutex_unlock(&pool->mutexpool);
return;
}
// 添加任务
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
pool->queueSize++;
pthread_cond_signal(&pool->notEmpty); // 唤醒阻塞在条件变量里的工作的线程,即生产者生产后唤醒消费者
pthread_mutex_unlock(&pool->mutexpool);
}
int threadPoolBusyNum(ThreadPool* pool) {
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}
int threadPoolAliveNum(ThreadPool* pool) {
pthread_mutex_lock(&pool->mutexpool);
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexpool);
return liveNum;
}
void* worker(void* arg) {
ThreadPool* pool = (ThreadPool*)arg;
while(1) {
pthread_mutex_lock(&pool->mutexpool); // 访问线程池之前加锁
// 当前任务队列是否为空
while(pool->queueSize == 0 && !pool->shutdown) { // 任务队列为空并且线程池没有被关闭
// 阻塞工作线程
pthread_cond_wait(&pool->notEmpty, &pool->mutexpool);
// 判断是不是要销毁线程
if(pool->exitNum > 0) {
pool->exitNum--;
if(pool->liveNum > pool->minNum) {
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexpool); // 阻塞的时候已经获得锁,已经锁上,如果不解开就会死锁
threadExit(pool);
}
}
}
// 判断线程池是否被关闭了
if(pool->shutdown) {
pthread_mutex_unlock(&pool->mutexpool); // 避免死锁
threadExit(pool);
}
// 从任务队列中取出一个任务
Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
// 移动头结点
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity; // 循环队列
pool->queueSize--;
pthread_cond_signal(&pool->notFull); // 消费者消费完产品后唤醒生产者
pthread_mutex_unlock(&pool->mutexpool); // 用完之后解锁
printf("thread %ld start working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg);
free(task.arg);
task.arg = NULL;
printf("thread %ld end working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}
void* manager(void* arg) {
ThreadPool* pool = (ThreadPool*)arg;
while(!pool->shutdown) {
// 每隔三秒钟检测一次
sleep(3);
// 取出线程池中任务的数量和当前线程的数量
pthread_mutex_lock(&pool->mutexpool);
int queueSize = pool->queueSize;
int liveNumber = pool->liveNum;
pthread_mutex_unlock(&pool->mutexpool);
// 取出忙的线程的数量
pthread_mutex_lock(&pool->mutexBusy);
int busyNumber = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
// 添加线程
// 任务的个数 大于 存活的线程个数 && 存活的线程数 小于 最大线程数
if(queueSize > liveNumber && liveNumber < pool->maxNum) {
pthread_mutex_lock(&pool->mutexpool);
int count = 0;
for(int i = 0; i < pool->maxNum && count < NUMBER && liveNumber < pool->maxNum; i++) {
if(pool->threadIDs[i] == 0) {
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
count++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexpool);
}
// 销毁线程
// 忙的线程 * 2 小于 存活的线程数 && 存活的线程 大于 最小线程数
// 之所以不用对pool->minNum加锁,是因为这个值是固定的,只需要读不需要写
if(busyNumber * 2 < liveNumber && liveNumber > pool->minNum) {
pthread_mutex_lock(&pool->mutexpool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexpool);
// 让工作的线程自杀
for(int i = 0; i < NUMBER; i++) {
pthread_cond_signal(&pool->notEmpty);
}
}
}
}
void threadExit(ThreadPool* pool) {
pthread_t tid = pthread_self(); // 获得线程自身的ID。
for(int i = 0; i < pool->maxNum; i++) {
if(pool->threadIDs[i] == tid) { // tid对应的线程要退出了
pool->threadIDs[i] = 0;
printf("threadExit() called, %ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL);
}
测试代码
#include<stdio.h>
#include "threadpool.h"
#include<pthread.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
void taskFunc(void* arg) {
int num = *(int*)arg;
printf("thread %ld is working, number = %d\n", pthread_self(), num);
sleep(1);
}
int main() {
// 创建线程池
ThreadPool* pool = threadPoolCreate(3, 10, 100);
for(int i = 0; i < 100; i++) {
int* num = (int*)malloc(sizeof(int));
*num = i + 100;
threadPoolAdd(pool, taskFunc, num);
}
sleep(30); // 等待子线程把任务处理完毕
threadPoolDestory(pool);
return 0;
}
基于C++的线程池设计与实现
因为在C++中,delete task.arg时候,由于delete void*类型是有危险的,因为void*指针只占四个字节,因此有可能不能全部地被释放,为了知道在程序中void*实际上是什么类型,因此在C++中可以使用模板来解决这一问题,因为模板可以传递类型。
任务队列声明
#pragma once
#include<queue>
#include<pthread.h>
using callback = void (*) (void* arg);
// 任务结构体
template <typename T>
struct Task
{
Task<T>() {
function = nullptr;
arg = nullptr;
}
Task<T>(callback f, void* arg) {
this->arg = (T*)arg;
function = f;
}
callback function; // 函数指针
T* arg;
};
template <typename T>
class TaskQueue
{
private:
std::queue <Task<T>> m_taskQ;
pthread_mutex_t m_mutex; // 互斥锁
public:
TaskQueue(/* args */);
~TaskQueue();
// 添加任务
void addTask(Task<T> task);
void addTask(callback f, void* arg);
// 取出一个任务
Task<T> takeTask();
// 获取当前任务的个数
inline size_t taskNumber() { // 没有if判断什么的,比较简单的直接写成内联函数比较好
return m_taskQ.size();
}
};
任务队列定义
#include "TaskQueue.h"
template <typename T>
TaskQueue<T>::TaskQueue(/* args */)
{
pthread_mutex_init(&m_mutex, NULL);
}
template <typename T>
TaskQueue<T>::~TaskQueue()
{
pthread_mutex_destroy(&m_mutex);
}
template <typename T>
void TaskQueue<T>::addTask(Task<T> task)
{
pthread_mutex_lock(&m_mutex);
m_taskQ.push(task);
pthread_mutex_unlock(&m_mutex);
}
template <typename T>
void TaskQueue<T>::addTask(callback f, void* arg)
{
pthread_mutex_lock(&m_mutex);
m_taskQ.push(Task<T>(f, arg));
pthread_mutex_unlock(&m_mutex);
}
template <typename T>
Task<T> TaskQueue<T>::takeTask()
{
Task<T> task;
pthread_mutex_lock(&m_mutex);
if(!m_taskQ.empty()) {
task = m_taskQ.front();
m_taskQ.pop();
}
pthread_mutex_unlock(&m_mutex);
return task;
}
线程池声明
#pragma once
#include "TaskQueue.h"
#include "TaskQueue.cpp"
template <typename T>
class ThreadPool
{
public:
ThreadPool(int min, int max);
~ThreadPool();
// 添加任务
void addTask(Task<T> task);
// 获取忙线程的个数
int getBusyNumber();
// 获取活着的线程个数
int getAliveNumber();
private:
// 工作的线程的任务函数
static void* worker(void* arg);
// 管理者线程的任务函数
static void* manager(void* arg);
void threadExit();
static const int NUMBER = 2;
pthread_mutex_t mutexPool;
pthread_cond_t notEmpty;
pthread_t* threadIDs;
pthread_t managerID;
TaskQueue<T>* taskQ;
int minNum;
int maxNum;
int busyNum;
int liveNum;
int exitNum;
bool shutdown = false;
};
线程池定义
#include "ThreadPool.h"
#include <iostream>
#include <string.h>
#include <string>
#include <unistd.h>
using namespace std;
template <typename T>
ThreadPool<T>::ThreadPool(int min, int max) {
// 实例化任务队列
do {
taskQ = new TaskQueue<T>;
if(taskQ == nullptr) {
cout << "malloc threadIDs fail...\n";
break;
}
threadIDs = new pthread_t[max];
if(threadIDs == nullptr) {
cout << "malloc threadIDs fail....\n";
break;
}
memset(threadIDs, 0, sizeof(pthread_t) * max);
minNum = min;
maxNum = max;
busyNum = 0;
liveNum = min; // 和最小个数相等
exitNum = 0;
if(pthread_mutex_init(&mutexPool, NULL) != 0 ||
pthread_cond_init(¬Empty, NULL) != 0
) {
cout << "mutex or condif init fail....\n";
break;
}
shutdown = false;
/*
为什么要把this传给manager呢?
因为manager是一个静态方法,静态方法它只能访问类里边的静态成员变量,
它不能访问类的非静态成员变量。
因此如果我们想要访问这些非静态成员变量,就必须要给这个静态方法传进去一个
实例化对象,通过传进去的这个实例化对象来访问里边的非静态成员函数和变量
*/
// 创建管理者线程和工作者线程
pthread_create(&managerID, NULL, manager, this); // 第三个参数为管理者线程的任务函数
for(int i = 0; i < min; i++) {
pthread_create(&threadIDs[i], NULL, worker, this); // 第三个参数为工作的线程的任务函数
}
return;
} while(0);
// 释放资源
if(threadIDs) delete []threadIDs;
if(taskQ) delete taskQ;
}
template <typename T>
ThreadPool<T>::~ThreadPool() {
// 关闭线程池
shutdown = true;
// 阻塞回收管理者线程
pthread_join(managerID, NULL);
// 唤醒阻塞的消费者线程
for(int i = 0; i < liveNum; i++) { // 因为到了最后没有任务了,所以活着的线程都需要关闭
pthread_cond_signal(¬Empty);
}
// 释放堆内存
if(taskQ) {
delete taskQ;
}
if(threadIDs) {
delete []threadIDs;
}
pthread_mutex_destroy(&mutexPool);
pthread_cond_destroy(¬Empty);
}
template <typename T>
void ThreadPool<T>::addTask(Task<T> task) {
if(shutdown) {
return;
}
// 添加任务
taskQ->addTask(task);
pthread_cond_signal(¬Empty); // 唤醒阻塞在条件变量里的工作的线程,即生产者生产后唤醒消费者
}
template <typename T>
int ThreadPool<T>::getBusyNumber() {
pthread_mutex_lock(&mutexPool);
int busyNum = this->busyNum;
pthread_mutex_unlock(&mutexPool);
return busyNum;
}
template <typename T>
int ThreadPool<T>::getAliveNumber() {
pthread_mutex_lock(&mutexPool);
int liveNum = this->liveNum;
pthread_mutex_unlock(&mutexPool);
return liveNum;
}
template <typename T>
void* ThreadPool<T>::worker(void* arg) {
ThreadPool* pool = static_cast<ThreadPool*>(arg); // static_cast相当于c里面的强制类型转换
while(1) {
pthread_mutex_lock(&pool->mutexPool); // 访问线程池之前加锁
// 当前任务队列是否为空
while(pool->taskQ->taskNumber() == 0 && !pool->shutdown) { // 任务队列为空并且线程池没有被关闭
// 阻塞工作线程
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
// 判断是不是要销毁线程
if(pool->exitNum > 0) {
pool->exitNum--;
if(pool->liveNum > pool->minNum) {
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool); // 阻塞的时候已经获得锁,已经锁上,如果不解开就会死锁
pool->threadExit();
}
}
}
// 判断线程池是否被关闭了
if(pool->shutdown) {
pthread_mutex_unlock(&pool->mutexPool); // 避免死锁
pool->threadExit();
}
// 从任务队列中取出一个任务
Task<T> task = pool->taskQ->takeTask();
pool->busyNum++;
// 因为任务队列可以无限大,所以生产者可以不用唤醒
pthread_mutex_unlock(&pool->mutexPool); // 用完之后解锁
printf("thread %ld start working...\n", pthread_self());
task.function(task.arg);
delete task.arg; //
task.arg = nullptr;
printf("thread %ld end working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexPool);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexPool);
}
return NULL;
}
template <typename T>
void* ThreadPool<T>::manager(void* arg) {
ThreadPool* pool = static_cast<ThreadPool*>(arg);
while(!pool->shutdown) {
// 每隔三秒钟检测一次
sleep(3);
// 取出线程池中任务的数量和当前线程的数量
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->taskQ->taskNumber();
int liveNumber = pool->liveNum;
int busyNumber = pool->busyNum; // 取出忙的线程的数量
pthread_mutex_unlock(&pool->mutexPool);
// 添加线程
// 任务的个数 大于 存活的线程个数 && 存活的线程数 小于 最大线程数
if(queueSize > liveNumber && liveNumber < pool->maxNum) {
pthread_mutex_lock(&pool->mutexPool);
int count = 0;
for(int i = 0; i < pool->maxNum && count < NUMBER && liveNumber < pool->maxNum; i++) {
if(pool->threadIDs[i] == 0) {
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
count++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
// 销毁线程
// 忙的线程 * 2 小于 存活的线程数 && 存活的线程 大于 最小线程数
// 之所以不用对pool->minNum加锁,是因为这个值是固定的,只需要读不需要写
if(busyNumber * 2 < liveNumber && liveNumber > pool->minNum) {
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
// 让工作的线程自杀
for(int i = 0; i < NUMBER; i++) {
// 唤醒已经被阻塞的工作的线程,因为工作的线程无事可做,肯定是被阻塞在条件变量中
pthread_cond_signal(&pool->notEmpty);
}
}
}
}
template <typename T>
void ThreadPool<T>::threadExit() {
pthread_t tid = pthread_self(); // 获得当前线程的线程ID。
for(int i = 0; i < maxNum; i++) {
if(threadIDs[i] == tid) { // tid对应的线程要退出了
threadIDs[i] = 0;
printf("threadExit() called, %ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL);
}
测试
#include "ThreadPool.h"
#include "ThreadPool.cpp"
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
void taskFunc(void* arg) {
int num = *(int*)arg;
printf("thread %ld is working, number = %d\n", pthread_self(), num);
sleep(1);
}
int main() {
// 创建线程池
ThreadPool<int> pool(3, 10);
for(int i = 0; i < 100; i++) {
int* num = new int(i + 100);
pool.addTask(Task<int>(taskFunc, num));
}
sleep(20); // 等待子线程把任务处理完毕
return 0;
}
g++ main.cpp -o main -lpthread