1.线程池原理
创建一个线程,实现很方便。
缺点:若并发的线程数量很多,并且每个线程都是执行一个时间较短的任务就结束了。
由于频繁的创建线程和销毁线程需要时间,这样的频繁创建线程会大大降低
系统的效率。
2.思考 :怎么样实现线程复用,让线程执行完一个任务后不被销毁,还可以继续执行其他的任务?
答:线程池
3.思考 :什么是线程池?
可以点击看爱编程的大丙写的这边文章!写的很好
4.线程池的组成:任务队列、工作的线程、管理者线程
(1)任务队列
听到任务队列之后,咱们在脑海里应该闪现出另外一个模型,就是生产者和消费者模型。因为只要
有任务队列,就离不开生产者和消费者模型。为什么要有任务队列呢?就是因为他有生产者要把生
产出来的数据或者商品进行存储,存储起来之后让对应的某些消费者去消费。在我们写程序的时候,
就是有一些生产者线程还是负责往这个任务队列里边放任务,然后有一些消费者线程负责把任务从
任务队列中取出去,并且把它处理掉。在线程池里边有任务队列,也就意味着这个线程池它是生产者
和消费者模型里边的一部分。哪一个部分呢?这个线程池主要是负责维护一个任务队列,然后再维护
若干个消费者线程。它维护的线程都是消费者线程。
那么生产者是谁呢?谁使用任务队列,谁就是生产者。那么这个生产者它把任务放到任务队列里边。
怎么放呢?肯定是通过线程池提供的api接口,把这个任务放到任务队列,,放进来之后,这个消费
者线程通过一个循环,不停地从这个任务队列里边去取任务,
假设说咱们这个任务队列为空了,消费者线程就需要阻塞了。可以使用条件变量把它阻塞掉就行了。
如果让消费者线程去阻塞,它就放弃了CPU时间片了,这个对系统的资源消耗是一点都没有的。
对于生产者来说,假设这个任务队列已经满了,需要阻塞生产者。当这个消费者消费了这个产品之后,
任务队列就不再满了,那么任务队列不再满之后,咱们就让消费者把这个生产者唤醒,让他们继续生
产。这个任务队列是两个角色,而不是三个角色,这一点要搞明白的!!!
(2)工作的线程
这个任务队列里边的任务都是函数地址,工作的线程在处理这个任务的时候,它就基于这个函数地址
对那个函数进行调用。也就说这个任务队列里边的任务都是回调函数。
什么时候去回调呢?
就是当线程池里边的这个消费者线程把它取出去之后,它就被调用了。如果说没有被取出来,他就不
被调用了。
(3)管理者线程
管理者线程的职责非常单一,就是不停的去检测当前任务队列里边任务的个数,还有当前工作的线程
的线程数,然后针对于它们的数量进行一个运算。看一看现在需要添加线程还是销毁线程。干这个事
的时候,可以给它设置一个频率,比如说你可以让他五秒钟去做一次或者十秒钟去做一次。sleep管理
者是非常轻松的
(一)任务类Task的定义
#pragma once
#include <queue>
#include <pthread.h>
using callback = void (*)(void *);
// 任务结构体
template <typename T>
struct Task
{
Task<T>()
{
function = nullptr;
arg = nullptr;
}
Task<T>(callback f, void *arg)
{
this->arg = (T *)arg;
function = f;
}
callback function;
T *arg;
};
(二)任务队列TaskQueue的定义
template <typename T>
class TaskQueue
{
public:
TaskQueue();
~TaskQueue();
// 添加任务
void addTask(Task<T> task);
void addTask(callback f, void *arg);
// 取出一个任务
Task<T> takeTask();
// 获取当前任务的个数
inline size_t taskNumber()
{
return m_taskQ.size();
}
private:
std::queue<Task<T> > m_taskQ;
pthread_mutex_t m_mutex;
};
(三)线程池 ThreadPool 类的声明
#pragma once
#include "TaskQueue.h"
template <typename T>
class ThreadPool
{
public:
// 创建线程池并初始化
ThreadPool(int min, int max);
// 销毁线程池
~ThreadPool();
// 给线程池添加任务
void addTask(Task<T> task);
// 获取线程池中工作的线程的个数
int getBusyNum();
// 获取线程池中活着的线程个数
int getAliveNum();
private:
// 工作的线程是(消费者线程)任务函数
static void *worker(void *arg);
// 管理者线程任务函数
static void *manager(void *arg);
// 单个线程退出
void threadExit();
private:
// 任务队列
TaskQueue<T> *taskQ;
pthread_t managerID; // 管理者线程ID
pthread_t *threadIDs; // 工作的线程ID
int minNum; // 最小线程数量
int maxNum; // 最大线程数量
int busyNum; // 忙的线程的个数
int liveNum; // 存活的线程的个数
int exitNum; // 要销毁的线程个数
pthread_mutex_t mutexPool; // 锁整个的线程池
pthread_cond_t notEmpty; // 任务队列是不是空了
bool shutdown = false; // 是不是要销毁线程池
static const int NUMBER = 2;
};
(四)线程池 构造函数
template <typename T>
ThreadPool<T>::ThreadPool(int min, int max)
{
do
{
// 实例化任务队列
taskQ = new TaskQueue<T>;
if (taskQ == nullptr)
{
std::cout << "new taskQ fail..." << std::endl;
break;
}
// 根据线程的最大上限给线程数组分配内存
threadIDs = new pthread_t[max];
if (threadIDs == nullptr)
{
std::cout << "new threadIDs fail..." << std::endl;
break;
}
// 初始化
memset(threadIDs, 0, sizeof(pthread_t) * max);
minNum = min;
maxNum = max;
busyNum = 0;
liveNum = min; // 和最小个数相等
exitNum = 0;
// 初始化互斥锁,条件变量
if (pthread_mutex_init(&mutexPool, NULL) != 0 ||
pthread_cond_init(¬Empty, NULL) != 0)
{
std::cout << "mutex or condition init fail...\n"
<< std::endl;
break;
}
shutdown = false;
/// 创建线程 //
//"void *(ThreadPool::*)(void *arg)" 类型的实参与 "void *(*)(void *)" 类型的形参不兼容
// 类的静态成员或者类的外部的非类普通函数,只要定义出来之后就有地址
// 如果是类的成员函数,并且不是静止的,这个函数定义出来是没有地址的
// 什么时候有地址呢?我们必须要给这个类进行实例化,创建类的对象,这个
// 函数才有地址
// this指针指向当前被实例化的对象,如果要是在外边new出来一个ThreadPool对象,
// 那么这个this指针就指向该实例化对象
// 为什么要把this传给manager呢?
// 因为manager是一个静态方法,静态方法它只能访问类里边的静态成员变量,
// 它不能访问类的非静态成员变量。
// 因此如果我们想要访问这些非静态成员变量,就必须要给这个静态方法传进去一个
// 实例化对象,通过传进去的这个实例化对象来访问里边的非静态成员函数和变量
// 创建管理者线程,1个
pthread_create(&managerID, NULL, manager, this);
// 根据最小线程个数,创建线程
for (int i = 0; i < min; i++)
{
pthread_create(&threadIDs[i], NULL, worker, this);
}
return;
} while (0);
// 释放资源
if (threadIDs)
delete[] threadIDs;
if (taskQ)
delete taskQ;
}
(五)工作的线程的任务函数
// 工作线程任务函数
template <typename T>
void *ThreadPool<T>::worker(void *arg)
{
ThreadPool *pool = static_cast<ThreadPool *>(arg);
// 一直不停的工作
while (true)
{
// 访问任务队列(共享资源)加锁
pthread_mutex_lock(&pool->mutexPool);
// 当前任务队列是否为空,如果为空工作线程阻塞
while (pool->taskQ->taskNumber() == 0 && !pool->shutdown)
{
//printf("thread %ld waiting...\n",pthread_self());
// 阻塞工作线程
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
// 解除阻塞之后,判断是不是要销毁线程
if (pool->exitNum > 0)
{
pool->exitNum--;
if (pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
pool->threadExit();
}
}
}
// 判断线程池是否被关闭了
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
pool->threadExit();
}
// 从任务队列中取出一个任务
Task<T> task = pool->taskQ->takeTask();
// 工作的线程+1
pool->busyNum++;
// 线程池解锁
pthread_mutex_unlock(&pool->mutexPool);
// 执行任务
printf("thread %ld start working...\n", pthread_self());
// std::cout<<"thread "<<to_string(pthread_self()) <<" start working..."<<std::endl;
task.function(task.arg);
delete task.arg;
task.arg = nullptr;
// 任务处理结束
printf("thread %ld end working...\n", pthread_self());
// std::cout<<"thread "<<std::string to_string(pthread_self()) <<" end working..."<<std::endl;
pthread_mutex_lock(&pool->mutexPool);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexPool);
}
return nullptr;
}
(六)线程退出函数
//线程退出
template <typename T>
void ThreadPool<T>::threadExit()
{
pthread_t tid = pthread_self();
for (int i = 0; i < maxNum; ++i)
{
if (threadIDs[i] == tid)
{
threadIDs[i] = 0;
// std::cout<<"threadExit() called, "<<std::string to_string(tid)<<" exiting..."<<std::endl;
printf("threadExit() called,%ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL); // 这个是标准C的函数
}
(七)管理者线程的任务函数
// 管理者线程任务函数
template <typename T>
void *ThreadPool<T>::manager(void *arg)
{
ThreadPool *pool = static_cast<ThreadPool *>(arg);
// 如果线程池没有关闭,就一直检测
while (!pool->shutdown)
{
// 每隔3s检测一次
sleep(3);
// 取出线程池中任务的数量和当前线程的数量 取出(工作的)忙的线程的数量
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->taskQ->taskNumber();
int liveNum = pool->liveNum;
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexPool);
// 添加线程
// 任务的个数>存货的线程个数 && 存活的线程数 < 最大线程数
if (queueSize > liveNum && liveNum < pool->maxNum)
{
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for (int i = 0; i < pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
// 销毁多余的线程
// 忙的线程 * 2 < 存活的线程数 && 存活的线程 > 最小线程数
if (busyNum * 2 < liveNum && liveNum > pool->minNum)
{
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
// 让工作的线程自杀
for (int i = 0; i < NUMBER; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}
return nullptr;
}
(八)给线程池添加任务
// 给线程池添加任务
template <typename T>
void ThreadPool<T>::addTask(Task<T> task)
{
if (shutdown)
{
return;
}
// 添加任务,不需要加锁,任务队列中有锁
taskQ->addTask(task);
// 唤醒工作的线程
pthread_cond_signal(¬Empty); // 通知消费者消费
}
(九)获取线程池忙的线程个数和活着的线程个数
template <typename T>
int ThreadPool<T>::getBusyNum()
{
pthread_mutex_lock(&mutexPool);
int busyNum = this->busyNum;
pthread_mutex_unlock(&mutexPool);
return busyNum;
}
template <typename T>
int ThreadPool<T>::getAliveNum()
{
pthread_mutex_lock(&mutexPool);
int aliveNum = this->liveNum;
pthread_mutex_unlock(&mutexPool);
return aliveNum;
}
(十)线程池析构
template <typename T>
ThreadPool<T>::~ThreadPool()
{
// 关闭线程池
shutdown = true;
// 阻塞回收管理者线程
pthread_join(managerID, NULL);
// 唤醒阻塞的消费者线程
for (int i = 0; i < liveNum; i++)
{
pthread_cond_signal(¬Empty);
}
// 释放堆内存
if (taskQ) delete taskQ;
if (threadIDs) delete[] threadIDs;
pthread_mutex_destroy(&mutexPool);
pthread_cond_destroy(¬Empty);
}
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
完整代码:
TaskQueue.h
#pragma once
#include <queue>
#include <pthread.h>
using callback = void (*)(void *);
// 任务结构体
template <typename T>
struct Task
{
Task<T>()
{
function = nullptr;
arg = nullptr;
}
Task<T>(callback f, void *arg)
{
this->arg = (T *)arg;
function = f;
}
callback function;
T *arg;
};
template <typename T>
class TaskQueue
{
public:
TaskQueue();
~TaskQueue();
// 添加任务
void addTask(Task<T> task);
void addTask(callback f, void *arg);
// 取出一个任务
Task<T> takeTask();
// 获取当前任务的个数
inline size_t taskNumber()
{
return m_taskQ.size();
}
private:
std::queue<Task<T> > m_taskQ;
pthread_mutex_t m_mutex;
};
TaskQueue.cpp
#include "TaskQueue.h"
#include <pthread.h>
template <typename T>
TaskQueue<T>::TaskQueue()
{
// 初始化互斥锁
pthread_mutex_init(&m_mutex, NULL);
}
template <typename T>
TaskQueue<T>::~TaskQueue()
{
// 销毁互斥锁
pthread_mutex_destroy(&m_mutex);
}
template <typename T>
void TaskQueue<T>::addTask(Task<T> task)
{
pthread_mutex_lock(&m_mutex);
m_taskQ.push(task);
pthread_mutex_unlock(&m_mutex);
}
template <typename T>
Task<T> TaskQueue<T>::takeTask()
{
Task<T> t;
pthread_mutex_lock(&m_mutex);
if (!m_taskQ.empty())
{
t = m_taskQ.front();
m_taskQ.pop();
}
pthread_mutex_unlock(&m_mutex);
return t;
}
template <typename T>
void TaskQueue<T>::addTask(callback f, void *arg)
{
pthread_mutex_lock(&m_mutex);
m_taskQ.push(Task<T>(f, arg));
pthread_mutex_unlock(&m_mutex);
}
ThreadPool.h
#pragma once
#include "TaskQueue.h"
template <typename T>
class ThreadPool
{
public:
// 创建线程池并初始化
ThreadPool(int min, int max);
// 销毁线程池
~ThreadPool();
// 给线程池添加任务
void addTask(Task<T> task);
// 获取线程池中工作的线程的个数
int getBusyNum();
// 获取线程池中活着的线程个数
int getAliveNum();
private:
// 工作的线程是(消费者线程)任务函数
static void *worker(void *arg);
// 管理者线程任务函数
static void *manager(void *arg);
// 单个线程退出
void threadExit();
private:
// 任务队列
TaskQueue<T> *taskQ;
pthread_t managerID; // 管理者线程ID
pthread_t *threadIDs; // 工作的线程ID
int minNum; // 最小线程数量
int maxNum; // 最大线程数量
int busyNum; // 忙的线程的个数
int liveNum; // 存活的线程的个数
int exitNum; // 要销毁的线程个数
pthread_mutex_t mutexPool; // 锁整个的线程池
pthread_cond_t notEmpty; // 任务队列是不是空了
bool shutdown = false; // 是不是要销毁线程池
static const int NUMBER = 2;
};
ThreadPool.cpp
#include "ThreadPool.h"
#include <iostream>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
template <typename T>
ThreadPool<T>::ThreadPool(int min, int max)
{
do
{
// 实例化任务队列
taskQ = new TaskQueue<T>;
if (taskQ == nullptr)
{
std::cout << "new taskQ fail..." << std::endl;
break;
}
// 根据线程的最大上限给线程数组分配内存
threadIDs = new pthread_t[max];
if (threadIDs == nullptr)
{
std::cout << "new threadIDs fail..." << std::endl;
break;
}
// 初始化
memset(threadIDs, 0, sizeof(pthread_t) * max);
minNum = min;
maxNum = max;
busyNum = 0;
liveNum = min; // 和最小个数相等
exitNum = 0;
// 初始化互斥锁,条件变量
if (pthread_mutex_init(&mutexPool, NULL) != 0 ||
pthread_cond_init(¬Empty, NULL) != 0)
{
std::cout << "mutex or condition init fail...\n"
<< std::endl;
break;
}
shutdown = false;
/// 创建线程 //
//"void *(ThreadPool::*)(void *arg)" 类型的实参与 "void *(*)(void *)" 类型的形参不兼容
// 类的静态成员或者类的外部的非类普通函数,只要定义出来之后就有地址
// 如果是类的成员函数,并且不是静止的,这个函数定义出来是没有地址的
// 什么时候有地址呢?我们必须要给这个类进行实例化,创建类的对象,这个
// 函数才有地址
// this指针指向当前被实例化的对象,如果要是在外边new出来一个ThreadPool对象,
// 那么这个this指针就指向该实例化对象
// 为什么要把this传给manager呢?
// 因为manager是一个静态方法,静态方法它只能访问类里边的静态成员变量,
// 它不能访问类的非静态成员变量。
// 因此如果我们想要访问这些非静态成员变量,就必须要给这个静态方法传进去一个
// 实例化对象,通过传进去的这个实例化对象来访问里边的非静态成员函数和变量
// 创建管理者线程,1个
pthread_create(&managerID, NULL, manager, this);
// 根据最小线程个数,创建线程
for (int i = 0; i < min; i++)
{
pthread_create(&threadIDs[i], NULL, worker, this);
}
return;
} while (0);
// 释放资源
if (threadIDs)
delete[] threadIDs;
if (taskQ)
delete taskQ;
}
// 工作线程任务函数
template <typename T>
void *ThreadPool<T>::worker(void *arg)
{
ThreadPool *pool = static_cast<ThreadPool *>(arg);
// 一直不停的工作
while (true)
{
// 访问任务队列(共享资源)加锁
pthread_mutex_lock(&pool->mutexPool);
// 当前任务队列是否为空,如果为空工作线程阻塞
while (pool->taskQ->taskNumber() == 0 && !pool->shutdown)
{
//printf("thread %ld waiting...\n",pthread_self());
// 阻塞工作线程
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
// 解除阻塞之后,判断是不是要销毁线程
if (pool->exitNum > 0)
{
pool->exitNum--;
if (pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
pool->threadExit();
}
}
}
// 判断线程池是否被关闭了
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
pool->threadExit();
}
// 从任务队列中取出一个任务
Task<T> task = pool->taskQ->takeTask();
// 工作的线程+1
pool->busyNum++;
// 线程池解锁
pthread_mutex_unlock(&pool->mutexPool);
// 执行任务
printf("thread %ld start working...\n", pthread_self());
// std::cout<<"thread "<<to_string(pthread_self()) <<" start working..."<<std::endl;
task.function(task.arg);
delete task.arg;
task.arg = nullptr;
// 任务处理结束
printf("thread %ld end working...\n", pthread_self());
// std::cout<<"thread "<<std::string to_string(pthread_self()) <<" end working..."<<std::endl;
pthread_mutex_lock(&pool->mutexPool);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexPool);
}
return nullptr;
}
//线程退出
template <typename T>
void ThreadPool<T>::threadExit()
{
pthread_t tid = pthread_self();
for (int i = 0; i < maxNum; ++i)
{
if (threadIDs[i] == tid)
{
threadIDs[i] = 0;
// std::cout<<"threadExit() called, "<<std::string to_string(tid)<<" exiting..."<<std::endl;
printf("threadExit() called,%ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL); // 这个是标准C的函数
}
// 管理者线程任务函数
template <typename T>
void *ThreadPool<T>::manager(void *arg)
{
ThreadPool *pool = static_cast<ThreadPool *>(arg);
// 如果线程池没有关闭,就一直检测
while (!pool->shutdown)
{
// 每隔3s检测一次
sleep(3);
// 取出线程池中任务的数量和当前线程的数量 取出(工作的)忙的线程的数量
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->taskQ->taskNumber();
int liveNum = pool->liveNum;
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexPool);
// 添加线程
// 任务的个数>存货的线程个数 && 存活的线程数 < 最大线程数
if (queueSize > liveNum && liveNum < pool->maxNum)
{
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for (int i = 0; i < pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
// 销毁多余的线程
// 忙的线程 * 2 < 存活的线程数 && 存活的线程 > 最小线程数
if (busyNum * 2 < liveNum && liveNum > pool->minNum)
{
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
// 让工作的线程自杀
for (int i = 0; i < NUMBER; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}
return nullptr;
}
// 给线程池添加任务
template <typename T>
void ThreadPool<T>::addTask(Task<T> task)
{
if (shutdown)
{
return;
}
// 添加任务,不需要加锁,任务队列中有锁
taskQ->addTask(task);
// 唤醒工作的线程
pthread_cond_signal(¬Empty); // 通知消费者消费
}
template <typename T>
int ThreadPool<T>::getBusyNum()
{
pthread_mutex_lock(&mutexPool);
int busyNum = this->busyNum;
pthread_mutex_unlock(&mutexPool);
return busyNum;
}
template <typename T>
int ThreadPool<T>::getAliveNum()
{
pthread_mutex_lock(&mutexPool);
int aliveNum = this->liveNum;
pthread_mutex_unlock(&mutexPool);
return aliveNum;
}
template <typename T>
ThreadPool<T>::~ThreadPool()
{
// 关闭线程池
shutdown = true;
// 阻塞回收管理者线程
pthread_join(managerID, NULL);
// 唤醒阻塞的消费者线程
for (int i = 0; i < liveNum; i++)
{
pthread_cond_signal(¬Empty);
}
// 释放堆内存
if (taskQ) delete taskQ;
if (threadIDs) delete[] threadIDs;
pthread_mutex_destroy(&mutexPool);
pthread_cond_destroy(¬Empty);
}
main.cpp
#include <iostream>
#include <pthread.h>
#include "ThreadPool.h"
#include "ThreadPool.cpp"
#include "TaskQueue.cpp"
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
void taskFunc(void *arg)
{
int num = *(int *)arg;
printf("thread %ld is working, number = %d \n", pthread_self(), num);
sleep(1);
}
int main()
{
// 创建线程池
ThreadPool<int> pool(3, 10);
for (int i = 0; i < 100; ++i)
{
int *num = new int(i + 100);
*num = i + 100;
pool.addTask(Task<int>(taskFunc, num));
}
sleep(20);
return 0;
}
跟着这个老师的教程学习的,总结的笔记!!!
手写线程池 - C改C++版 | 爱编程的大丙 (subingwen.cn)https://subingwen.cn/linux/threadpool-cpp/