写过CURD的程序员肯定都熟悉“数据库连接池”这个东西,数据库连接是个昂贵的东西,因为它的创建过程比较耗费资源。所以为了节约资源、提高数据库操作的性能,“数据库连接池”就应运而生了。
其实线程池跟数据库连接池类似,一个线程的创建和销毁也需要耗费资源,如果反复地创建和删除线程,那么资源的浪费也是很可观的。所以,预先创建一些线程,在有任务的时候分配给相应的线程去执行,对提高系统的性能有非常积极的作用。
一、线程池的创建过程
1、创建线程
for (size_t i = 0; i < threadCount; ++i) {
threads.emplace_back(threadFunc, this);
}
threads是个vector,我们可以先创建一些线程,并把这些线程放到一个vector中。
2、任务队列与调度
void ThreadPool::addTask(const Task& task) {
{
lock_guard<mutex> lock(queueMutex);
taskQueue.emplace(task);
}
condition.notify_one();
}
新进来的任务会被加到taskQueue队列中,你可以根据一定的策略从taskQueue中取出任务,交给某个空闲的线程去执行。
3、线程执行及回收
void ThreadPool::threadFunc() {
while (true) {
Task task;
{
unique_lock<mutex> lock(queueMutex);
//wait()用来等一个东西
//1. 如果第2个参数为false,就跟只有1个参数一样,直接阻塞到本行
//阻塞到什么时候为止呢?有其他持有锁的线程notify()后.
//2. 当wait()被notify()唤醒后,会先判断第2个参数是否为true,如果为true才会
//继续往下执行.
condition.wait(lock, [this]() { return !taskQueue.empty() || terminate; });
if (terminate && taskQueue.empty()) {
break;
}
task = taskQueue.front();
taskQueue.pop();
}
task(); // Execute the task.
}
}
4、线程池的优雅终止
在多线程程序中,如何终止线程是一个需要“慎重”考虑的问题,你不能直接给stop了,因为这会导致某个正在运行的线程被硬生生地中断,会导致内部数据的不一致!
我们需要先添加一个标志:terminate,当terminate标记为true时,先把线程池的while任务给停了,然后唤醒所有等待中的线程,使用std::thread::join()函数等待线程执行完毕。
ThreadPool::~ThreadPool() {
terminate = true;
condition.notify_all(); // 唤醒所有等待中的线程
for (thread& th : threads) {
if (th.joinable()) {
th.join(); // 等待线程执行完毕
}
}
}
二、一个线程池的例子
#include <vector>
#include <queue>
#include <thread>
#include <iostream>
#include <stdexcept>
#include <condition_variable>
#include <memory>
#define MAX_THREADS 8 //最大线程数目
class Task{
public:
void process(){
}
};
class ThreadPool {
public:
ThreadPool() = default;
ThreadPool(const ThreadPool &) = delete;
ThreadPool(ThreadPool &&) = delete;
ThreadPool &operator = (const ThreadPool &) = delete;
ThreadPool &operator = (ThreadPool &&) = delete;
//初始化所有线程并启动线程
void init(int threadCount){
if (threadCount <= 0 || threadCount > MAX_THREADS) {
throw std::exception();
}
for (int i = 0; i < threadCount; i++) {
// emplace_back不能先构造再插入.
// 方法2: std::thread temp(worker, this); work_threads.push(temp);
work_threads.emplace_back(worker, this);
}
}
bool dispatch(std::shared_ptr<Task>&& request){
// 操作工作队列时一定要加锁, 因为他被所有线程共享
std::unique_lock<std::mutex> lock(queue_mutex);
lock.lock();
tasks_queue.push(request);
lock.unlock();
// 线程池添加进去了任务, 通知等待的线程
condition.notify_one();
return true;
}
~ThreadPool(){
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
condition.notify_all();
for (auto& w : work_threads) {
w.join();
}
}
private:
// 工作线程需要运行的函数,不断的从任务队列中取出并执行
static void* worker(void* arg){
ThreadPool* pool = (ThreadPool*)arg;
pool->run();
return pool;
}
void run(){
while (!stop)
{
// unique_lock() 出作用域会自动解锁
std::unique_lock<std::mutex> lk(this->queue_mutex);
// 如果任务队列不为空,就停下来等待唤醒
this->condition.wait(lk, [this] {
return !this->tasks_queue.empty();
});
std::shared_ptr<Task> request = tasks_queue.front();
tasks_queue.pop();
if (request) {
request->process();
}
}
}
private:
// 工作线程
std::vector<std::thread> work_threads;
// 任务队列
std::queue<std::shared_ptr<Task>> tasks_queue;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop = false;
};
int main(){
ThreadPool pool;
pool.init(4);
return 0;
}
参考:
(1)基于C++11实现线程池
(2)轻松掌握C++线程池:从底层原理到高级应用