文章目录
- 前言
- 线程池架构组成
- **一、任务队列(Task Queue)**
- **二、工作线程组(Worker Threads)**
- **三、管理者线程(Manager Thread)**
- 系统协作流程图解
- 一、QRunnable
- 二、QThreadPool
- 三、线程池的应用场景
- Web服务器
- 开发中的常见场景
- 并行数据处理
- 延时异步任务
前言
在并发编程中,当我们使用线程时,通常的做法是在需要时创建一个新的线程。这种方法实现起来较为简便,但存在一个显著问题:如果并发线程数量较多,且每个线程仅执行一个耗时较短的任务,频繁地创建和销毁线程将显著降低系统效率,因为线程的创建和销毁本身需要消耗一定的时间。
那么,是否存在一种机制可以使线程在执行完一个任务后不被销毁,而是能够继续执行其他任务,从而实现线程的复用呢?
线程池(Thread Pool)正是这样一种多线程处理模式。在线程池中,任务被添加到队列中,线程池在创建线程后会自动启动这些任务。线程池中的线程均为后台线程,每个线程使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中处于空闲状态(例如等待某个事件),线程池会插入另一个辅助线程以确保所有处理器保持忙碌状态。如果所有线程池线程都处于忙碌状态,但队列中仍有待处理的任务,线程池会在一段时间后创建另一个辅助线程,但线程总数不会超过预设的最大值。超过最大值的线程将被排队,等待其他线程完成任务后再启动。
线程池的概念在多种编程语言中均有体现,且许多语言直接提供了线程池的实现,开发者可以直接使用。接下来,我们将详细介绍线程池的实现原理。
线程池架构组成
线程池系统由三大核心组件协同构成,各组件功能职责如下:
一、任务队列(Task Queue)
- 功能定义
- 作为任务缓冲区,负责存储待处理的任务集合
- 采用先进先出(FIFO)调度策略,确保任务有序执行
- 操作接口
- 通过线程池API提供任务添加(
enqueue
)与删除(dequeue
)操作 - 已处理任务由系统自动移出队列
- 通过线程池API提供任务添加(
- 生产者角色
- 调用线程池API向队列提交任务的线程称为生产者线程(Producer Thread)
二、工作线程组(Worker Threads)
- 线程配置
- 维护固定数量(N)的常驻工作线程
- 作为任务队列的消费者(Consumer),持续执行以下操作:
- 从任务队列中提取待处理任务
- 执行任务处理逻辑
- 阻塞机制
- 当任务队列为空时,工作线程通过条件变量(Condition Variable)或信号量(Semaphore)进入阻塞状态
- 生产者提交新任务后触发唤醒机制,恢复工作线程执行
- 资源隔离性
- 工作线程仅关注任务执行,不参与线程池状态管理
三、管理者线程(Manager Thread)
- 监控职责
- 独立于任务处理流程,周期性执行以下检测:
- 任务队列的积压数量
- 当前处于忙碌状态的工作线程数
- 独立于任务处理流程,周期性执行以下检测:
- 动态调节策略
- 扩容机制:当任务负载超过阈值时,动态创建新增工作线程
- 缩容机制:当系统闲置率过高时,安全销毁冗余工作线程
- 设计目标
- 实现资源利用效率与任务吞吐量的平衡优化
系统协作流程图解
通过三者的协同配合,线程池实现了任务分发、资源调度与负载均衡的闭环管理,有效提升多线程环境下的任务处理效能与系统稳定性。
一、QRunnable
QRunnable类 常用函数不多,主要是设置任务对象传给线程池后,是否需要自动析构。
// 在子类中必须要重写的函数, 里边是任务的处理流程
[pure virtual] void QRunnable::run();
// 参数设置为 true: 这个任务对象在线程池中的线程中处理完毕, 这个任务对象就会自动销毁
// 参数设置为 false: 这个任务对象在线程池中的线程中处理完毕, 对象需要程序猿手动销毁
void QRunnable::setAutoDelete(bool autoDelete);
// 获取当前任务对象的析构方式,返回true->自动析构, 返回false->手动析构
bool QRunnable::autoDelete() const;
创建一个要添加到线程池中的任务类,处理方式如下:
class MyWork : public QObject, public QRunnable
{
Q_OBJECT
public:
explicit MyWork(QObject *parent = nullptr)
{
// 任务执行完毕,该对象自动销毁
setAutoDelete(true);
}
~MyWork();
void run() override{}
}
在上面的示例中MyWork类是一个多重继承,如果需要在这个任务中使用Qt的信号槽机制进行数据的传递就必须继承QObject这个类,如果不使用信号槽传递数据就可以不继承了,只继承QRunnable即可。
class MyWork :public QRunnable
{
Q_OBJECT
public:
explicit MyWork()
{
// 任务执行完毕,该对象自动销毁
setAutoDelete(true);
}
~MyWork();
void run() override{}
}
二、QThreadPool
Qt中的 QThreadPool 类管理了一组 QThreads, 里边还维护了一个任务队列。QThreadPool 管理和回收各个 QThread 对象,以帮助减少使用线程的程序中的线程创建成本。每个Qt应用程序都有一个全局 QThreadPool 对象,可以通过调用 globalInstance() 来访问它。也可以单独创建一个 QThreadPool 对象使用。
线程池常用的API函数如下:
// 获取和设置线程中的最大线程个数
int maxThreadCount() const;
void setMaxThreadCount(int maxThreadCount);
// 给线程池添加任务, 任务是一个 QRunnable 类型的对象
// 如果线程池中没有空闲的线程了, 任务会放到任务队列中, 等待线程处理
void QThreadPool::start(QRunnable * runnable, int priority = 0);
// 如果线程池中没有空闲的线程了, 直接返回值, 任务添加失败, 任务不会添加到任务队列中
bool QThreadPool::tryStart(QRunnable * runnable);
// 线程池中被激活的线程的个数(正在工作的线程个数)
int QThreadPool::activeThreadCount() const;
// 尝试性的将某一个任务从线程池的任务队列中删除, 如果任务已经开始执行就无法删除了
bool QThreadPool::tryTake(QRunnable *runnable);
// 将线程池中的任务队列里边没有开始处理的所有任务删除, 如果已经开始处理了就无法通过该函数删除了
void QThreadPool::clear();
// 在每个Qt应用程序中都有一个全局的线程池对象, 通过这个函数直接访问这个对象
static QThreadPool * QThreadPool::globalInstance();
一般情况下,我们不需要在Qt程序中创建线程池对象,直接使用Qt为每个应用程序提供的线程池全局对象即可。得到线程池对象之后,调用start()方法就可以将一个任务添加到线程池中,这个任务就可以被线程池内部的线程池处理掉了,使用线程池比自己创建线程的这种多种多线程方式更加简单和易于维护。
具体的使用方式如下:
mywork.h
class MyWork :public QRunnable
{
Q_OBJECT
public:
explicit MyWork();
~MyWork();
void run() override;
}
mywork.cpp
MyWork::MyWork() : QRunnable()
{
// 任务执行完毕,该对象自动销毁
setAutoDelete(true);
}
void MyWork::run()
{
// 业务处理代码
......
}
mainwindow.cpp
MainWindow::MainWindow(QWidget *parent) :
QMainWindow(parent),
ui(new Ui::MainWindow)
{
ui->setupUi(this);
// 线程池初始化,设置最大线程池数
QThreadPool::globalInstance()->setMaxThreadCount(4);
// 添加任务
MyWork* task = new MyWork;
QThreadPool::globalInstance()->start(task);
}
三、线程池的应用场景
Web服务器
在处理HTTP请求时,每个请求都可以作为一个独立的任务提交到线程池中,由线程池中的线程处理,这样做的好处是可以快速响应用户请求,同时复用线程资源
开发中的常见场景
异步任务处理
例如发送电子邮件、执行后台计算等这些都可以作为异步任务提交给线程池,从而不会阻塞主程序的执行。
并行数据处理
比如我从mysql中去取出10万条数据,结果集是一个List,分批次处理,每个批次1000条,在for循环中循环的次数就是批次的数目,每次循环会提交给线程池一个异步运算任务,比如这里会分为100个批次,那么会for循环100次提交100个异步运算任务,线程池中的线程会并行去处理这些批次的数据,然后再把每个处理后的批次组合为一个最终结果
延时异步任务
我需要开启一个异步任务,但这个异步任务需要等待30秒后再执行,最简单粗暴的方法是使用Thread.sleep方法,但这种的话会造成线程资源的浪费,高并发情况下就容易出现线程资源紧缺的问题。