文章目录
- 1. 前言
- 1.1 概念
- 1.2 应用场景
- 1.3 线程池的种类
- 1.4 线程池的通常组成
- 2. 代码示例
- 2.1 log.hpp
- 2.2 lockGuard.hpp
- ① pthread_mutex_t
- 2.3 Task.hpp
- 2.4 thread.hpp
- 2.5 threadPool.hpp
- ① 基本框架
- ② 成员变量
- ③ 构造函数
- ④ 其余功能函数:
- main.cc
- 结果演示
- 完整代码
1. 前言
1.1 概念
线程池是一种并发编程的解决方案(线程使用模式),它由一组工作线程和一个任务队列组成。
-
工作线程在初始化时被创建并持续运行,等待从任务队列中获取任务并执行。
-
当任务执行完成后,线程不会退出,而是继续保持运行状态,等待下一个任务的到来。
-
线程池不仅能够保证内核的充分利用,还能防止过分调度
1.2 应用场景
-
高并发服务器:在高并发的网络服务器中,每个客户端请求通常都需要创建一个新的线程来处理,并发量较高时会导致线程的频繁创建和销毁,浪费大量的系统资源。使用线程池可以复用已有的线程,降低线程创建和销毁的开销。
-
大规模数据处理:在需要大量计算的任务中,使用线程池可以将计算分配到多个线程中进行,提高计算效率。
-
定时任务:在定时任务中,可以将任务提交到线程池中,在预定时间执行任务,避免在定时任务到来时才创建新的线程,提高程序的响应速度。
-
图像、视频等多媒体处理:在多媒体处理中,通常需要对大量的数据进行处理,使用线程池可以将处理任务分配到多个线程中进行,提高处理效率。
-
批量请求处理:在需要批量处理请求的场景中,使用线程池可以将请求分配到多个线程中进行处理,提高处理效率和响应速度。
1.3 线程池的种类
根据线程池的实现方式和特性,可以将线程池分为以下几种类型:
-
固定大小线程池:固定大小线程池是最简单的线程池实现方式,它在初始化时创建一定数量的工作线程,这些线程会一直存在直到线程池销毁。
- 当有新的任务提交到线程池时,会将任务放入任务队列中,并由空闲的工作线程执行。这种线程池的优点是稳定可靠,缺点是无法动态调整线程数量。
-
动态大小线程池:动态大小线程池可以根据系统负载情况自动调整线程数量,以适应不同的并发量。
- 当有大量任务需要处理时,会自动增加线程数量,反之则会减少线程数量,从而节约系统资源。这种线程池的优点是能够自适应负载,缺点是实现较为复杂,容易出现线程数量过多或过少的情况。
-
定时线程池:定时线程池可以在指定时间执行任务,通常用于定时任务、定期检查等场景。
- 当任务提交到线程池后,会设置一个定时器,在指定的时间到达时执行任务。这种线程池的优点是精确可靠,缺点是对定时器进行管理需要额外的开销。
-
工作窃取线程池:工作窃取线程池是一种高效的线程池实现方式,它采用工作窃取算法,让空闲的线程从其他线程的任务队列中窃取任务,从而避免了任务分配不均的问题。这种线程池的优点是高效均衡,缺点是实现较为复杂。
1.4 线程池的通常组成
线程池通常由以下几个核心组件组成:
- 线程池管理器(ThreadPool Manager):负责创建和管理线程池的生命周期。
- 工作队列(Work Queue):用于存储待执行的任务,当线程池中的线程完成当前任务后,会从工作队列中获取新的任务进行处理。
- 线程池(Thread Pool):包含一组预先创建的线程,这些线程可用于执行任务。
- 任务(Task):要在线程池中执行的具体操作或代码逻辑。
2. 代码示例
下面我们会介绍线程池的代码示例,主要功能包括:
- 创建固定数量线程池,循环从任务队列中获取任务对象
- 获取到任务对象后,执行任务对象中的任务接口
2.1 log.hpp
- 首先对于
log.hpp
,是一个日志文件,用于在部分正常情况或者程序运行异常时输出信息。
日志文件完全根据需要编写,对于该文件就不再过多描述。
#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
// 宏定义 日志级别
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
// 全局字符串数组 : 将日志级别映射为对应的字符串
const char *gLevelMap[] = {
"DEBUG",
"NORMAL",
"WARNING",
"ERROR",
"FATAL"
};
#define LOGFILE "./threadpool.log" // LOGFILE: 表示日志文件的路径
void logMessage(int level, const char* format, ...)
{
// 判断DEBUG_SHOW 是否定义,分别执行操作
#ifndef DEBUG_SHOW // 将日志级别映射为对应的字符串
if(level == DEBUG) return; // DEBUG_SHOW不存在 且 日志级别为 DEBUG时,返回
#endif
// DEBUG_SHOW存在 则执行下面的日志信息
char stdBuffer[1024];
time_t timestamp = time(nullptr);
// 将日志级别和时间戳格式化后的字符串将会被写入到 stdBuffer 缓冲区中
snprintf(stdBuffer, sizeof(stdBuffer), "[%s] [%ld] ", gLevelMap[level], timestamp);
char logBuffer[1024];
va_list args;
va_start(args, format);
vsnprintf(logBuffer, sizeof(logBuffer), format, args);
va_end(args);
FILE* fp = fopen(LOGFILE, "a");
}
2.2 lockGuard.hpp
在 lockGuard.hpp
中我们 实现了一个 需封装了互斥锁的Mutex类 和一个 实现自动加解锁的lockGuard类。
Mutex
类封装了pthread_mutex_t
类型的互斥锁lockGuard
类是一个RAII风格的加锁方式。
① pthread_mutex_t
pthread_mutex_t
是POSIX线程库提供的互斥锁类型。与C++标准库中的std::mutex
类似,pthread_mutex_t
也可以用于实现线程间的互斥访问。
详情接口可以看下图
通过这种方式,lockGuard对象的生命周期和锁的生命周期绑定在一起,可以确保在任何情况下都能保证锁的正确释放,避免死锁等问题
对于lockGuard.hpp,其中包含两个类, Mutex类(封装一个互斥锁) 与 lockGuard类(用于自动释放互斥锁)
Mutex类:
#pragma once
#include <mutex>
#include <pthread.h>
// Mutex 类是对 pthread_mutex_t 的封装
class Mutex
{
public:
Mutex(pthread_mutex_t *mtx):_pmtx(mtx)
{}
~Mutex()
{}
void lock() // 加锁
{
pthread_mutex_lock(_pmtx);
}
void unlock() // 解锁
{
pthread_mutex_unlock(_pmtx);
}
private:
pthread_mutex_t* _pmtx; // POSIX线程库提供的互斥锁类型
};
lockGuard类:
// RAII 加锁方式
// lockGuard 实现了自动释放互斥锁的效果
class lockGuard
{
public:
lockGuard(pthread_mutex_t *mtx):_mtx(mtx)
{
_mtx.lock();
}
~lockGuard()
{
_mtx.unlock();
}
private:
Mutex _mtx;
};
2.3 Task.hpp
- Task类即线程池的任务对象,这里实现的功能很简单,该任务类接收两个参数以及一个处理函数,比如传入 (1, 2 , ADD) 就可以进行相加的操作
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include "log.hpp"
typedef std::function<int(int, int)> func_t; // 定义一个函数类型,用于传递给Task
class Task
{
public:
// 构造
Task(){}
Task(int x, int y, func_t func):_x(x),_y(y),_func(func)
{}
void operator()(const std::string& name)
{
// __FILE__ 和 __LINE__ : 预定义宏,用于获取当前文件名和行号。确保它们能够正确展开并提供有效的日志输出信息。
logMessage(WARNING, "%s 处理完成: %d+%d=%d | %s | %d",
name.c_str(), _x, _y, _func(_x, _y), __FILE__, __LINE__);
}
private:
// 成员变量:两个参数以及处理方法
int _x;
int _y;
func_t _func;
};
2.4 thread.hpp
该文件中 我们实现 一个用于创建和管理线程的简单封装类 Thread
,使用 POSIX 线程库(pthread)来实现线程的创建和管理。
其中包含两个类:
ThreadData
:用于存储线程的额外数据Thread
:用于创建和管理线程的简单封装类
ThreadData类:
- ThreadData 即用于存储线程的内容(所含变量)
#pragma once
#include <iostream>
#include <string>
#include <pthread.h>
#include <functional>
#include <cstdio>
typedef void *(*func_t)(void *);
// ThreadData: 存储线程的额外数据
class ThreadData
{
public:
void* _args; // 线程参数
std::string _name; // 线程名称
};
Thread类:
对于封装的Thread类,其包含以下功能:
- 创建
- 等待
- 获取名称
class Thread
{
public:
// 构造函数接受三个参数:num 表示线程编号,callback 是一个函数指针,表示线程要执行的函数,args 是传递给线程函数的参数。
Thread(int num, func_t callback, void* args):_func(callback) // 回调指针
{
// 线程编号和函数指针存储在成员变量 _name 和 _func 中,并初始化 _tData 对象的成员变量 _args 和 _name。
char nameBuffer[64]; // 存储线程名称
snprintf(nameBuffer, sizeof(nameBuffer), "Thread-%d", num); // 将格式化字符串 "Thread-%d" 和 num 参数合并,生成线程的名称
_name = nameBuffer; // 赋值名称
_tData._args = args;
_tData._name = _name;
}
~Thread()
{}
void start() // 创建线程
{
pthread_create(&_tid, nullptr, _func, (void*)&_tData);
}
void join() // 等待线程完成
{
pthread_join(_tid, nullptr);
}
std::string name() // 返回线程名称
{
return _name;
}
private:
std::string _name; // 线程名称
func_t _func; // 函数指针:存储线程要执行的函数
ThreadData _tData; // ThreadData变量,线程数据
pthread_t _tid; // 标识唯一线程
};
2.5 threadPool.hpp
该threadPool.hpp
文件主要封装了一个线程池类ThreadPool,作为重点,这里会对每一个函数单独讲解:
① 基本框架
下面的框架展示整个文件的基本内容,将具体的函数功能先省略:
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <pthread.h>
#include <mutex>
#include "lockGuard.hpp"
#include "thread.hpp"
#include "log.hpp"
const int g_thread_num = 3; // 默认线程个数
// 线程池其本质是生产消费者模型
template<class T>
class ThreadPool
{
private:
// 构造函数
ThreadPool(int thread_num = g_thread_num):_num(thread_num)
{}
// 禁用拷贝 && 赋值
// 防止多个线程池对象共享同一个线程池内部资源而造成混乱
ThreadPool(const ThreadPool<T>& other) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T> &other) = delete;
public:
pthread_mutex_t *getMutex()
{}
bool isEmpty() // 判断任务队列是否为空
{}
void waitCond() //等待条件变量
{}
T getTask() // 获取任务
{}
// 获取线程池实例
static ThreadPool<T> *getThreadPool(int num = g_thread_num)
{}
// 启动线程
void run()
{}
// 每个线程的执行函数: 消费者执行逻辑
static void* routine(void *args)
{}
// 添加任务
void pushTask(const T& task)
{}
private:
// 成员变量...
};
template <typename T>
ThreadPool<T> *ThreadPool<T>::thread_ptr = nullptr;
// PTHREAD_MUTEX_INITIALIZER 是一个宏: 用于初始化互斥锁的静态常量。
template <typename T>
pthread_mutex_t ThreadPool<T>::mutex = PTHREAD_MUTEX_INITIALIZER;
// 这样的定义和初始化可以确保在使用 ThreadPool<T> 类时, thread_ptr 和 mutex 这两个静态成员变量被正确创建和初始化。
// 静态成员变量是该类所有实例共享的,通过初始化为适当值,确保在使用时具有预期的初始状态。
② 成员变量
该ThreadPool类包含以下成员变量:
每个成员变量的具体作用都在注释中标出👇
private:
std::vector<Thread*> _threads; // 线程集合
std::queue<T> _task_queue; // 任务队列
int _num; // 表示线程数量
static ThreadPool<T>* thread_ptr; // 静态成员指针,用于保存 ThreadPool 类的唯一实例
static pthread_mutex_t mutex; // 静态互斥锁,用于保证对 ThreadPool 类唯一实例的访问的互斥性
pthread_mutex_t lock; // 互斥锁
pthread_cond_t cond; // 条件变量
③ 构造函数
条件变量是一种线程同步机制,用于等待或唤醒特定条件的线程。
- 在
pthread
库中,可以使用pthread_cond_init函数来初始化条件变量对象。该函数的原型如下:
int pthread_cond_init(pthread_cond_t* cond, const pthread_condattr_t* attr)
ThreadPool(int thread_num = g_thread_num):_num(thread_num)
{
// 初始化互斥锁与条件变量
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&cond, nullptr);
for(int i = 1; i <= _num; ++i)
_threads.push_back(new Thread(i, routine, this));
}
// 禁用拷贝 && 赋值
// 防止多个线程池对象共享同一个线程池内部资源而造成混乱
ThreadPool(const ThreadPool<T>& other) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T> &other) = delete;
对于上面的部分函数进行解释:
互斥锁是一种线程同步机制,用于保护共享资源,避免多个线程同时访问而引起的竞态条件。在pthread库中,可以使用pthread_mutex_init函数来初始化互斥锁对象。
该函数的原型如下:
int pthread_mutex_init(pthread_mutex_t* mutex, const pthread_mutexattr_t* attr);
④ 其余功能函数:
getMutex()
- 返回互斥锁
pthread_mutex_t *getMutex()
{
return &lock; // 返回互斥锁指针
}
isEmpty()
- 判断任务队列是否为空
bool isEmpty() // 判断任务队列是否为空
{
return _task_queue.empty();
}
waitCond()
- 等待条件变量的信号
- 在调用该函数之前,需要先获得互斥锁,并将其传递给函数,以确保线程在等待条件时不会被其他线程同时访问共享资源。
void waitCond() //等待条件变量
{
pthread_cond_wait(&cond, &lock);
}
getTask()
- 用于线程获取队列
T getTask() // 获取任务
{
T t = _task_queue.front(); // 获取队列首位元素(任务)
_task_queue.pop(); // 删除该任务
return t;
}
getThreadPool()
- 获取线程池实例
// 获取线程池实例
static ThreadPool<T> *getThreadPool(int num = g_thread_num)
{
// 判断是否已经创建了线程池实例
if(thread_ptr == nullptr)
{
// 多个线程可能同时访问 thread_ptr,需要通过互斥锁来保护对其的操作
lockGuard lockguard(&mutex);
// 为了避免在多个线程被阻塞等待锁资源时,当第一个线程创建了线程池后,其他线程仍然会创建新的线程池 的情况
// 再次判断
if(thread_ptr == nullptr)
{
thread_ptr = new ThreadPool<T>(num);
}
}
return thread_ptr;
}
run()
- 用于启动线程池中的线程
// 启动线程
void run()
{
// 遍历线程池中的每个线程并依次创建
for(auto& iter : _threads)
{
iter->start();
logMessage(NORMAL, "%s %s", iter->name().c_str(), "Creation completed");
}
}
rountine()
- 线程内部的执行逻辑:
- 获取传来的参数(线程数据)后,在循环中建立任务对象:
- 依次进行等待条件变量、读取任务
// 每个线程的执行函数: 消费者执行逻辑
static void* routine(void *args)
{
ThreadData* td = (ThreadData*)args;
ThreadPool<T>* tp = (ThreadPool<T>*)td->_args;
while(true)
{
T task;
{
// 初始化 task 对象,并调用适当的构造函数
lockGuard lockguard(tp->getMutex());
while(tp->isEmpty()) // 当线程为空,消费操作等待
tp->waitCond();
// 读取任务
task = tp->getTask();
}
// 执行任务
task(td->_name);
}
}
pushTask()
- 用于向任务队列中添加任务
// 添加任务
void pushTask(const T& task)
{
lockGuard lockguard(&lock);
_task_queue.push(task); // 将任务传入到 任务队列
pthread_cond_signal(&cond); // 等待条件变量的线程,有新的任务可用
}
main.cc
- 用于形成最后等待可执行文件,
- 具体为:通过任务类生成一批 题目,并将任务推送给线程池
#include "threadPool.hpp"
#include "Task.hpp"
#include <iostream>
#include <ctime>
#include <unistd.h>
#include <pthread.h>
int main()
{
srand((unsigned long)time(nullptr) ^ getpid()); // 设置随机数种子
ThreadPool<Task>::getThreadPool()->run(); //
while(true)
{
int x = rand() % 100 + 1;
usleep(1000);
int y = rand() % 30 + 1;
Task t(x, y, [](int x, int y)->int{
return x + y;
});
logMessage(DEBUG, "create task success: %d + %d = ?", x, y);
logMessage(DEBUG, "create task success: %d + %d = ?", x, y);
logMessage(DEBUG, "create task success: %d + %d = ?", x, y);
logMessage(DEBUG, "create task success: %d + %d = ?", x, y);
// 推送任务到线程池
ThreadPool<Task>::getThreadPool()->pushTask(t);
sleep(1);
}
return 0;
}
结果演示
最后可以呈现出类似如下的效果:
ThreadPool started.
create task success: 43 + 18 = ?
create task success: 12 + 27 = ?
create task success: 55 + 8 = ?
create task success: 92 + 5 = ?
Waiting for tasks...
create task success: 37 + 21 = ?
create task success: 67 + 29 = ?
create task success: 89 + 14 = ?
create task success: 25 + 3 = ?
完整代码
上文涉及到的代码完整代码如下👇 :
ThreadPool代码实例