文章目录
- 一、基础知识
- 1. 服务器编程基本框架
- 2. 五种 I/O 模型
- 3. 事件处理模式
- 4. 半同步/半反应堆模式
- 5. 线程池
- 二、代码解析
- 1. 线程池类定义
- 2. 线程池创建与回收
- 3. 向请求队列中添加任务
- 4. 线程处理函数
- 5. run 执行任务
- 参考文献
一、基础知识
1. 服务器编程基本框架
服务器的结构通常如上图,主要有 I/O 单元、逻辑单元和网络存储单元组成,每个单元之间通过请求队列进行通信,从而协同完成任务。
在项目中,I/O 单元用于处理客户端连接,读写网络数据;逻辑单元用于处理业务逻辑;网络存储单元用于存储本地数据库和文件。
模块 | 单个服务器程序 | 服务器机群 |
---|---|---|
I/O 处理单元 | 处理客户连接,读写网络数据 | 作为接入服务器,实现负载均衡 |
逻辑单元 | 业务进程或线程 | 逻辑服务器 |
网络存储单元 | 本地数据库、文件或缓存 | 数据库服务器 |
请求队列 | 各单元之间的通信方式 | 各服务器之间的永久 TCP 连接 |
2. 五种 I/O 模型
- 阻塞 I/O :调用函数后,必须等待调用函数返回后才进行下一步操作;
- 非阻塞 I/O :调用函数后,每隔一段时间检测 I/O 事件是否就绪,若没有就绪则可以做其他事情;若就绪则继续执行后续代码。非阻塞 I/O 执行系统调用总是立即返回,而不管 I/O 事件是否就绪,需要根据 errno 区分具 I/O 事件是否就绪;
- 信号驱动 I/O :调用函数后,不再主动检测 I/O 事件是否就绪,而是等待事件完成或发生错误后进程收到的信号来判断 I/O 事件是否就绪,因此该方法是非阻塞的;
- I/O 复用:主要指 Linux 使用的 3 个函数,它们均能同时监听多个事件:
- select :会使进程阻塞,没有将文件描述符和事件进行绑定,它仅仅是一个文件描述符的集合,因此使用时需要遍历所有事件,来查找目标事件;
- poll :会使进程阻塞,将文件描述符和事件绑定,所有事件都被统一处理,使用时同样需要遍历所有事件,来查找目标事件;
- epoll :在内核中维护一个事件表,提供
epoll_ctl
方法来控制其中的事件(增、删、改),使用时只需要遍历就绪事件(前 n 个)。
- 异步 I/O :Linux 可以调用
aio_read
函数告诉内核描述字缓冲区指针和缓冲区大小、文件偏移及通知的方式,然后立即返回,当内核将数据拷贝到缓存区后在通知应用程序。
前四种 I/O 模型都是同步的,即内核向应用程序通知的是就绪事件,需要用户代码对其执行操作;异步 I/O 则是指内核像应用程序通知完成事件,即内核已完成 I/O 操作。
3. 事件处理模式
- reactor :主线程(I/O 处理单元)只负责监听文件描述符上是否有事件发生,有的话立即通知工作线程(逻辑单元),所有 I/O 具体的操作均在工作线程中完成,通常为同步的模型;
- proactor :主线程和内核负责处理 I/O 的具体操作(如读写数据),工作线程仅负责业务逻辑(如处理客户请求),通常为异步的。由于异步 I/O 模型并不成熟,因此项目中使用同步 I/O 模拟实现 proactor 模式。
同步 I/O 模型的工作流程如下:
- 主线程往 epoll 内核事件表注册 socket 上的读就绪事件;
- 主线程调用 epoll_wait 等待 socket 上有数据可读;
- 当 socket 上有数据可读,epoll_wait 通知主线程,此时主线程从 socket 循环读取数据,读完后将数据封装成一个请求对象插入请求队列;
- 睡眠在请求队列上的某个工作线程被唤醒,它获得请求对象并处理,然后往 epoll 内核事件表中注册 socket 上的写就绪事件;
- 主线程调用 epoll_wait 等待 socket 可写;
- 当 socket 上有数据可写,epoll_wait 通知主线程,此时主线程往 socket 上写入服务器处理客户请求的结果。
4. 半同步/半反应堆模式
项目中使用的并发模式指 I/O 处理单元与逻辑单元的协同方式,该模型是半同步/半异步模型的变体,同步指的是程序完全按照代码顺序执行,这是一个串行的方式;异步指的是程序的执行需要由系统事件驱动,也就是说多个事件(多段代码)同时进行。
半同步/半异步模式的工作流程如下:
- 异步线程用于处理 I/O 事件;
- 异步线程监听到客户请求后,就将其封装成请求对象并插入到请求队列中;
- 请求队列将通知某个工作线程(同步模式的线程,用于处理客户逻辑)来读取并处理该请求。
半同步/半反应堆模式的工作流程如下(proactor 模式):
- 主线程充当异步线程,负责监听所有 socket 上的事件;
- 若有新请求到来,主线程接收并得到新连接的 socket ,然后往 epoll 内核事件表中注册该 socket 上的读写事件;
- 如果该 socket 上有读写事件发生,主线程从 socket 上接收数据,并将数据封装成请求对象插入到请求队列中;
- 所有工作线程(同步模式的线程)睡眠在请求队列行,当有任务到来时,通过竞争获得任务的接管权。
5. 线程池
一般的任务中,当需要使用线程处理工作时,我们才构造一个线程出来,而当任务完成时则回收该线程防止占用资源。但是这样的方式在服务器中会造成很大的时间开销,因此可以采用线程池的方式用空间换时间:
- 池表示一组资源的集合,这组资源在服务器启动之初就被创建并初始化,称为静态资源;
- 在服务器运行阶段,如有任务需要使用线程,则可以直接从线程池中获取,无需动态分配;
- 当一个线程工作完成后,无需将其销毁,只需要放入池中即可。
二、代码解析
1. 线程池类定义
template <typename T>
class threadpool
{
public:
// 构造函数
// actor_model:工作模式
// connPool:数据库连接池指针
// thread_number:线程池中线程的数量
// max_request:请求队列中最多允许的、等待处理的请求的数量
threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000);
// 析构函数
~threadpool();
// 将http请求插入请求队列
bool append(T *request, int state);
bool append_p(T *request);
private:
// 工作线程运行的函数,它不断从工作队列中取出任务并执行之
static void *worker(void *arg);
void run();
private:
int m_thread_number; // 线程池中的线程数
int m_max_requests; // 请求队列中允许的最大请求数
pthread_t *m_threads; // 描述线程池的数组,其大小为m_thread_number
std::list<T *> m_workqueue; // 请求队列
locker m_queuelocker; // 保护请求队列的互斥锁
sem m_queuestat; // 是否有任务需要处理
connection_pool *m_connPool; // 数据库连接池
int m_actor_model; // 模型切换
};
2. 线程池创建与回收
// 构造函数
template <typename T>
threadpool<T>::threadpool(int actor_model, connection_pool *connPool, int thread_number, int max_requests) : m_actor_model(actor_model), m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL), m_connPool(connPool)
{
// 线程数和允许的最大请求数均小等于0,出错
if (thread_number <= 0 || max_requests <= 0)
throw std::exception();
// 初始化线程,分配id
m_threads = new pthread_t[m_thread_number];
if (!m_threads)
throw std::exception();
for (int i = 0; i < thread_number; ++i)
{
// 循环创建线程,1标识符,2线程属性(NULL为默认),3指定线程将运行的函数,4运行的参数
if (pthread_create(m_threads + i, NULL, worker, this) != 0)
{
delete[] m_threads;
throw std::exception();
}
// 将线程进行分离后,不用单独对工作线程进行回收,它在退出时会自行释放资源
if (pthread_detach(m_threads[i]))
{
delete[] m_threads;
throw std::exception();
}
}
}
// 析构函数
template <typename T>
threadpool<T>::~threadpool()
{
// 释放id空间即可,脱离线程会自行释放资源
delete[] m_threads;
}
3. 向请求队列中添加任务
// 向请求队列中添加任务
template <typename T>
bool threadpool<T>::append(T *request, int state)
{
// 互斥锁加锁
m_queuelocker.lock();
// 判断请求队列是否大于最大请求个数
if (m_workqueue.size() >= m_max_requests)
{
m_queuelocker.unlock();
return false;
}
// 设置HTTP请求状态
request->m_state = state;
// 向工作队列中添加任务
m_workqueue.push_back(request);
// 互斥锁解锁
m_queuelocker.unlock();
// 通过信号量提示有任务要处理
m_queuestat.post();
return true;
}
4. 线程处理函数
// 线程处理函数
template <typename T>
void *threadpool<T>::worker(void *arg)
{
// 将参数强制转为线程池类,调用成员方法
threadpool *pool = (threadpool *)arg;
// 内部访问私有成员函数run,完成线程处理
pool->run();
return pool;
}
5. run 执行任务
// 处理HTTP请求
template <typename T>
void threadpool<T>::run()
{
// 工作线程从请求队列中取出某个任务进行处理
while (true)
{
// P操作,取出一个任务,先处理信号量能保证线程一定能取到任务,虽然任务的顺序可能会变,但不影响
m_queuestat.wait();
// 互斥锁加锁
m_queuelocker.lock();
// 请求队列为空则继续循环
// 个人理解不需要该步骤,因为如果执行了,相当于取了一个工作线程信号量,但是没有取工作线程,出现bug
if (m_workqueue.empty())
{
m_queuelocker.unlock();
continue;
}
// 取第一个任务后互斥锁解锁
T *request = m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();
// 空请求,继续循环
if (!request)
continue;
// 模式1表示reactor
if (1 == m_actor_model)
{
// 读请求
if (0 == request->m_state)
{
// 读完缓存区内容或用户关闭连接,1表示缓存区正常读完
if (request->read_once())
{
// 设置improv
request->improv = 1;
// 从连接池中取出一个数据库连接
connectionRAII mysqlcon(&request->mysql, m_connPool);
// 处理http请求的入口
request->process();
}
// 未读完
else
{
request->improv = 1;
// 计时器标志
request->timer_flag = 1;
}
}
else
{
// 写请求
if (request->write())
{
request->improv = 1;
}
else
{
request->improv = 1;
request->timer_flag = 1;
}
}
}
// 模式2表示proactor
else
{
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process();
}
}
}
参考文献
[1] 最新版Web服务器项目详解 - 02 半同步半反应堆线程池(上)
[2] 最新版Web服务器项目详解 - 03 半同步半反应堆线程池(下)