文章目录
- 一、项目介绍
- 1. 项目简介
- 2. 开发环境
- 3. 核心技术
- 4. 开发阶段
- 二、前置知识了解
- 1. reactor
- 2. timerfd
- 3. timerwheel
- 4. eventfd
- 5. regex
- 6. any
- 三、框架设计
- 1. 项目模块划分
- 1.1 SERVER 模块
- 1.2 协议模块
- 2. 项目模块关系图
- 2.1 Connection 模块关系图
- 2.2 Acceptor 模块关系图
- 2.3 EventLoop 模块关系图
- 四、SERVER 模块开发
- 1. Buffer 模块
- 2. Socket 模块
- 3. Channel 模块
- 4. Poller 模块
- 5. EventLoop 模块
- 6. TimerQueue 模块
- 7. Connection 模块
- 8. Acceptor 模块
- 9. LoopThread 模块
- 10. LoopThreadPool 模块
- 11. TcpServer 模块
- 12. TcpServer 简单测试
- 五、HTTP 协议模块开发
- 1. Util 模块
- 2. HttpRequest 模块
- 3. HttpResponse 模块
- 4. HttpContext 模块
- 5. HttpServer 模块
- 6. HttpServer 简单测试
- 六、功能测试
- 1. 服务器长连接测试
- 2. 服务器超时连接测试
- 3. 服务器错误请求测试
- 4. 服务器业务处理超时测试
- 5. 服务器同时多条请求测试
- 6. 服务器大文件传输测试
- 7. 服务器性能压力测试
一、项目介绍
1. 项目简介
本项目主要是模仿 muduo 库实现一个以主从 Reactor 为模型,以 OneThreadOneEventLoop 为事件驱动的高并发服务器组件。通过这个服务器组件,我们可以简洁快速的搭建出一个高性能的 TCP 服务器。并且组件内部会提供不同的应用层协议支持,组件使用者可以通过这些协议快速的完成一个应用服务器的搭建。
muduo 源码 – https://github.com/chenshuo/muduo/tree/master/muduo
muduo 介绍 – https://www.cyhone.com/articles/analysis-of-muduo/
2. 开发环境
本项目的开发环境如下:
- Linux:在 Centos7.6 环境下进行开发环境搭建与项目部署。
- VSCode/Vim:通过 VSCode 远程连接服务器或直接使用 Vim 进行代码编写与功能测试。
- g++/gdb:通过 g++/gdb 进行代码编译与调试。
- Makefile:通过 Makefile 进行项目构建。
3. 核心技术
- C++11/C++17:使用 C++11/C++17 中的某些新特性完成代码的编写,例如 function/bind/shared_ptr/mutex/regex/any。
- Linux 系统编程:使用 Linux 相关系统调用完成代码编写,例如 read/write/timerfd/eventfd/epoll。
- Linux 网络编程:使用 Socket 相关接口实现网络通信,例如 socket/bind/listen/accept/recv/send/setsockopt。
- HTML:编写简单 HTML 页面进行功能测试。
4. 开发阶段
本项目一共分为四个开发阶段:
- 前置知识了解:对项目中需要用到的一些知识进行了解,学会它们的基本使用;比如 Reactor 模式,Linux 中的定时器与事件通知,C++中的正则表达式与通用容器 Any 等。
- 框架设计:进行项目模块划分,确定每一个模块需要实现的功能。
- 模块开发:对项目的各个模块进行开发与基本功能测试。
- 功能测试:对最终搭建出来的应用服务器进行各种功能测试,包括边缘功能测试以及服务器压力测试。
二、前置知识了解
1. reactor
Reactor 模式是指一个或多个客户端同时向服务器发送请求,进行业务处理的事件驱动处理模式。即服务器与客户端建立连接之后,哪个客户端给服务器发送了数据、触发了事件,服务器就对哪个客户端进行处理 (接收数据 + 处理数据 + 发送响应)。Reactor 模式的实现依赖于 I/O多路复用
技术。
Reactor模式分为单Reactor单线程、单Reactor多线程以及多Reactor多线程等不同的类别,具体如下:
单Reactor单线程:
-
思想:在单个线程中进行事件监控与处理;即通过IO多路复用模型进行客户端请求监控,触发事件后进行事件处理:
- 如果是新建连接请求,则获取新建连接,并添加至多路复用模型进行事件监控。
- 如果是数据通信请求,则进行对应数据处理(接收数据 + 处理数据 + 发送响应)。
上述所有操作都在服务器的一个线程中完成。
-
优点:所有操作都是串行化的,不用担心进程/线程间的通信与安全问题,同时编码流程也较为简单。
-
缺点:单线程操作容易造成性能瓶颈,导致客户端请求超时。
单Reactor多线程:
- 思想:一个Reactor线程 + 一个业务线程池;即Reactor线程通过IO多路复用模型进行客户端请求监控,触发事件后进行事件处理:
- 如果是新建连接请求,则获取新建连接,并添加至多路复用模型进行事件监控。
- 如果是数据通信请求,则接收数据后分发给业务线程池进行业务外理。
- 业务线程处理完些后,再将响应交给Reactor线程进行数据响应。
- 优点:充分利用了CPU多核资源,提高了处理效率,同时降低了代码耦合度。
- 缺点:多线程间的数据共享访问控制较为复杂,在单个Reactor线程中进行所有客户端的事件监控与I/O操作,不利于高并发场景 – 当短时间内有大量客户端连接时,服务器来不及进行新的客户端连接处理。
多Reactor多线程(主从Reactor):
-
思想:主Reactor线程(获取新连接)+ 从属Reactor线程池(事件监控与IO处理)+ 业务线程池 (业务处理);
基于单Reactor多线程的缺点,将Reactor线程分为主Reactor线程与从属Reactor线程:
- 主Reactor线程专门处理新连接请求事件,有新连接到来则将其分发到从属Reactor线程池中进行事件监控与IO处理。
- 从属Reactor线程用于客户端的通信事件监控,当客户端通信事件触发时,接收客户端数据并分发给业务线程池进行业务处理。
-
优点:充分利用了CPU多核资源,并且可以进行合理分配,适用于高并发场景。
-
注意:执行流并不是越多越好,因为执行流越多,CPU切换调度的成本越高,所以在某些主从Reactor模型中,并没有单独的业务线程池,而是将业务处理直接放到从属Reactor线程池中完成的,一切取决于业务场景。
目标定位:
- 本项目要实现的是主从Reactor模型下的 OneThreadOneEventLoop 式并发服务器组件,也就是主Reactor线程仅仅监控监听描述符,获取客户端新建连接,然后将获取到新连接后分发给从属Reactor进行通信事件监控,保证获取新连接的高效性,提高服务器的并发性能。
- 而从属Reactor线程负责客户端的通信事件监控、IO处理以及业务处理操作。
- 其中 OneThreadOneEventLoop 的思想就是把一个事件所有的操作都放到一个线程中进行,一个线程对应一个事件处理的循环。
- 最后,在当前实现中,由于并不确定组件使用者的使用意向,因此并不提供业务层工作线程池的实现,只实现主从Reactor,业务线程池可由组件库的使用者的根据需要自行决定是否使用和实现。
2. timerfd
由于服务器的资源是有限的,为了避免某些客户端连接上来之后一直不通信而平白浪费服务器资源的情况,我们需要对非活跃连接设置定时销毁,而实现这个功能的前提是得有一个定时器。
timerfd 是 Linux 给我们提供的定时器,它主要包括 timerfd_create (创建定时器) 和 timerfd_settime (启动定时器) 两个函数:
#include <sys/timerfd.h>
int timerfd_create(int clockid, int flags);
clockid:
CLOCK_REALTIME - 表示系统实时时间,如果定时器运行过程中修改了系统时间就会出现问题
CLOCK_MONOTONIC - 从系统开机到现在的时间,是⼀种相对时间
flags: 0 - 默认阻塞
返回值:timerfd的操作和普通文件操作是一样的,因此timerfd_create返回的时一个文件描述符
int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec *old);
fd: timerfd_create 返回的⽂件描述符
flags: 0 - 表示使用相对时间, 1 - 表示使用绝对时间;默认设置为0即可
new: 用于设置定时器的新超时时间
old: ⽤于接收原来的超时时间,不关心直接设置为NULL即可
返回值:启动成功返回0,失败返回-1
// timerfd 中用于设置时间的两个结构体
struct timespec {
time_t tv_sec; /* 秒 */
long tv_nsec; /* 纳秒 */
};
struct itimerspec {
struct timespec it_interval; /* 第⼀次超时之后每次超时的间隔时间 */
struct timespec it_value; /* 第⼀次超时时间 */
};
需要注意的是,timerfd 会在每次超时时,自动给 fd 中写入8字节的数据,表示从上⼀次读取数据到当前读取数据期间超时了多少次,所以我们在读取 fd 文件内容时需要一次读取8字节。
下面是 timerfd 的简单使用案例:
#include <stdio.h>
#include <stdint.h>
#include <unistd.h>
#include <sys/timerfd.h>
int main()
{
// 创建定时器
int timefd = timerfd_create(CLOCK_MONOTONIC, 0);
if(timefd < 0) {
printf("timerfd create error\n");
return 1;
}
// 设置首次超时时间
struct itimerspec itime;
itime.it_value.tv_sec = 3;
itime.it_value.tv_nsec = 0;
// 设置后续超时时间间隔
itime.it_interval.tv_sec = 1;
itime.it_interval.tv_nsec = 0;
// 启动定时器
int n = timerfd_settime(timefd, 0, &itime, NULL);
if (n == -1) {
printf("timerfd settime error\n");
return 1;
}
// 从timefd中读取超时次数
while(1) {
// 注意一次读取8字节数据
uint64_t times;
int n = read(timefd, ×, 8);
if (n < 0) {
printf("read error\n");
return 1;
}
printf("超时次数:%ld\n", times);
}
return 0;
}
3. timerwheel
我们上面学习了 Linux 中定时器的使用,但是它存在一个很大的缺陷 – 我们每次超时都需要将所有的连接遍历一遍,如果有上万个连接,那么效率无疑是较为低下的。
针对这个问题,我们可以以连接最近一次通信的系统时间为基准建立一个小跟堆,然后就只需要不断取出堆顶已超时的连接执行超时任务,直到没有超时连接即可。
不过在这里我们选择另外一个解决思路 – 时间轮。时间轮的思想来源于钟表,如果我们定了一个3点钟的闹铃,则当时针走到3的时候,就代表时间到了;同样的道理,如果我们定义了一个数组,同时有一个指针指向数组起始位置,此时我们只需要让这个指针每秒钟向后走动一步,走到哪里,则代表哪里的任务该被执行了。比如我们想要定一个3s 后的任务,则只需要将定时任务添加到 tick+3 位置,由于指针每秒针走一步,因此三秒钟后 tick 走到对应位置,这时候执行对应位置的超时连接销毁任务即可。
但是这里还存在几个问题:
-
如果我们的超时时间很长应该怎么办呢?
比如我们的超时时间为一天,我们是不是要定义一个 60 * 60 * 60s 的数组?解决办法很简单,我们可以将时间轮分级,即分别定义秒级时间轮、分级时间轮以及时级时间轮,如下:
此时我们仅需要 3 * 60 个整形的空间就可以实现 60 小时内的定时器了 (如果使用位图来定义定时轮仅需要 4*3 个字节的空间),而且就算我们要更大的时间长度,也只需要再加一个月级、年级时间轮即可。
-
如果同一时刻需要添加多个定时任务怎么办?
如果同一时刻或者同一时间段内有多个新连接到来,此时我们需要分别为它们添加定时销毁任务,但是时间轮 i 下标只有一个空间,此时应该怎么办呢?其实也很简单,只需要将时间轮定义为二维数组即可,固定时间轮的行数为60,当 i 位置需要添加定时任务时直接 timerwheel[i].push_back(task()) 就行了。
-
如何延迟定时任务?
假设当一个连接 30s 内没有IO事件发生,我们就认为它是非活跃连接,需要执行其定时任务进行销毁,那么如果该连接在定时任务添加后的 10s 时进行了 IO,此时我们应该将它的定时时间重新设置为 30s,那么如何做到呢?答案是借助 C++ 的类以及智能指针。
首先,我们使用一个类对定时任务进行封装并将定时任务的执行放到类的析构函数中执行,此时类实例化的每一个对象就是一个定时任务对象,并且只有当对象被销毁的时候才会去执行定时任务。
然后,我们使用 shared_ptr 来管理 new 出来的定时任务对象,由于 shared_ptr 内部存在一个计数器,只有当计数器为0的时候,才会释放所管理的对象,所以我们可以每次连接有 IO 事件发生时,就创建一个该连接对应的管理定时任务对象的智能指针,并将其添加到定时任务中,此时即使 20s 后该连接最开始添加的定时任务时间到了也只是 shared_ptr 的计数 --,其对应的定时任务对象并不会被释放,那么对象的析构函数也不会被执行,真正的定时任务也不会被执行了。
最后,关于 shared_ptr 的使用还有一个很重要的细节需要注意 – 对于一个定时任务对象,我们直接使用它来构造智能指针,则管理该对象的其他智能指针的引用计数并不会增加,只有当使用该对象的智能指针来构造新的智能指针时,则智能指针相互的引用计数才会增加,如下图所示:
为了解决这个问题,我们需要使用 weak_ptr 来管理原始的定时任务对象资源,然后再使用 weak_ptr 来构造 shared_ptr (weak_ptr.lock()) 用于加入时间轮中,保证 shared_ptr 的引用计数增加,最后再释放 weak_ptr 即可。(weak_ptr 本身不会导致 shared_ptr 的引用计数增加)
timerwheel 的实现如下:
using TaskFunc_t = std::function<void()>;
using ReleaseFunc_t = std::function<void()>;
/*定时任务*/
class TimerTask {
public:
TimerTask(uint64_t id, uint32_t timeout, const TaskFunc_t &task_cb)
: _id(id), _timeout(timeout), _cancelled(false), _task_cb(task_cb)
{}
/*设置release_cb回调函数*/
void SetRelease(const ReleaseFunc_t &release_cb) { _release_cb = release_cb; }
/*返回定时器任务超时时间*/
uint64_t Timeout() { return _timeout; }
/*取消定时任务*/
void Cancel() { _cancelled = true; }
/*在析构函数中执行定时任务*/
~TimerTask() {
if(_cancelled == false) _task_cb();
_release_cb();
}
private:
uint64_t _id; // 定时器任务id
uint32_t _timeout; // 超时时间
bool _cancelled; // 定时任务是否被取消
TaskFunc_t _task_cb; // 超时后执行的定时任务
ReleaseFunc_t _release_cb; // 用于删除TimerWheel中保存的定时器任务对象信息(weak_ptr)
};
using TaskPtr = std::shared_ptr<TimerTask>;
using TaskWeak = std::weak_ptr<TimerTask>;
/*时间轮*/
class TimerWheel {
private:
/*SetRelease回调函数 -- 从unordered_map中将定时任务信息移除*/
void RemoveTimer(uint64_t id) {
auto it = _timers.find(id);
if(it != _timers.end())
_timers.erase(it);
}
public:
TimerWheel()
: _capacity(60), _tick(0), _wheel(_capacity)
{}
/*添加定时任务*/
void TimerAdd(uint64_t id, uint32_t timeout, const TaskFunc_t &cb) {
// 实例化定时任务对象并交由智能指针管理
TaskPtr tp(new TimerTask(id, timeout, cb));
// 为了不增加引用计数,这里不能使用shared_ptr
_timers[id] = TaskWeak(tp);
tp->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));
// 将定时任务对象的shared_ptr添加到时间轮中
int pos = (_tick + timeout) % _capacity;
_wheel[pos].push_back(tp);
}
/*刷新/延迟定时任务*/
void TimerReFlush(uint64_t id) {
// 通过定时任务的weak_ptr构造一个shared_ptr
auto it = _timers.find(id);
if(it == _timers.end()) return;
TaskPtr tp = (it->second).lock();
// 获取超时时间
uint32_t timeout = tp->Timeout();
// 将shared_ptr添加到时间轮中
int pos = (_tick + timeout) % _capacity;
_wheel[pos].push_back(tp);
}
/*取消定时任务*/
void TimerCancel(uint64_t id) {
// 通过定时任务的weak_ptr构造一个shared_ptr
auto it = _timers.find(id);
if(it == _timers.end()) return;
TaskPtr tp = (it->second).lock();
// 通过shared_ptr设置定时任务的取消状态
tp->Cancel();
}
/*执行定时任务 -- 此函数应一秒被执行一次,相当于秒针向后移动一步*/
void RunTimerTask() {
_tick = (_tick + 1) % _capacity;
// 将秒针对应位置的定时任务队列清空,即释放定时任务对象的shared_ptr
_wheel[_tick].clear();
}
private:
int _capacity; // 容量,代表最大延迟时间
int _tick; // 当前的秒针,指向哪里就执行哪里的定时任务
std::vector<std::vector<TaskPtr>> _wheel; // 时间轮
std::unordered_map<uint64_t, TaskWeak> _timers; // 定时器任务id与管理定时任务对象的weak_ptr之间的关联关系
};
测试如下:
class Test {
public:
Test() { std::cout << "构造" << std::endl; }
~Test() { std::cout << "析构" << std::endl; }
};
void Deltest(Test *t) { delete t; }
int main()
{
Test *t = new Test();
TimerWheel tw;
tw.TimerAdd(1, 5, std::bind(Deltest, t));
// 刷新定时任务
for(int i = 0; i < 5; i++) {
sleep(1);
// 刷新定时任务
tw.TimerReFlush(1);
// 向后移动秒针
tw.RunTimerTask();
std::cout << "刷新定时任务超时时间为5s" << std::endl;
}
// 取消定时任务
// tw.TimerCancel(1);
// std::cout << "取消定时任务" << std::endl;
// 让秒针向后走
while(true) {
sleep(1);
tw.RunTimerTask();
std::cout << "秒针向后移动..." << std::endl;
}
return 0;
}
4. eventfd
eventfd 是 Linux 中的一种事件通知机制,即创建一个描述符用于实现事件通知,其本质就是在内核中管理一个计数器 (创建 eventfd 就会在内核中创建一个计数器结构)。
Linux 中一切皆文件,因此 eventfd 也是以文件的形式进行操作的,即我们可以通过 write 函数向 evenfd 中写入一个数值,此数值表示事件通知的次数,然后通过 read 函数读取 eventfd 中事件通知的次数 (计数器的值)。
eventfd 通常与 epoll 搭配使用 – 通过 epoll 进行描述符事件监控,当描述符有事件就绪时就读取 eventfd 中的值,然后进行事件处理。在本项目中,eventfd 的作用是在 EventLoop 模块中实现线程间的事件通知功能。
eventfd 系统调用的使用方法如下:
#include <sys/eventfd.h>
int eventfd(unsigned int initval, int flags);
initval: eventfd中计数器的初始值,通常设置为0;
flags:设置描述符属性,通常设置 EFD_CLOEXEC 和 EFD_NONBLOCK 即可。
EFD_CLOEXEC -- 禁止进程复制
EFD_NONBLOCK -- 设置描述符非阻塞属性
返回值:返回一个文件描述符用于操作。
需要注意的是,我们向 eventfd 中写入事件通知的次数时需要写入一个 8 字节的数字,对应的,我们从 eventfd 中读取数据时也需要用一个 8 字节的变量来保存。
最后,我们要注意区分 eventfd 和 信号以及信号量的区别:
- 信号是进程间事件通知的一种方式,被通知的事件由进程中的哪一个线程来处理是不确定的;而 eventfd 可以用于线程间的事件通知。
- 信号量和 eventfd 本质上都是维护了一个计数器,不同的是,每次通过 write 向 eventfd 写入的 val 表示事件通知的次数,而这个 val 可以大于1,通过 read 读取 eventfd 时,eventfd 中的计数器会被直接清0;而信号量每次PV操作只能让计数器 -1或者+1。当然,二者最本质的区别在于 eventfd 是一种事件通知机制,而信号量是一种进程间同步机制。
eventfd 的使用示例如下:
#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <fcntl.h>
#include <sys/eventfd.h>
int main() {
int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (fd < 0) {
printf("eventfd failed\n");
return 1;
}
// 每次向eventfd中写入1,表示一次事件通知
// 8字节数据
uint64_t val = 1;
write(fd, &val, sizeof val);
write(fd, &val, sizeof val);
write(fd, &val, sizeof val);
// 读取结果表示事件通知的次数,读取完毕后计数器清0
uint64_t res;
read(fd, &res, sizeof res);
printf("%ld\n", res);
return 0;
}
测试结果如下:
5. regex
由于我们要实现的是一个带有应用层协议 (HTTP) 支持的服务器组件,因此必然会涉及到对 HTTP 请求的解析,比如我们接收到了这样的一个 HTTP 请求:
GET /login?user=zhangsan&pass=123456 HTTP/1.1\r\n
那么我们需要从 HTTP 请求中提取出以下的信息:
GET -- 请求方法,我们需要针对不同的请求方法执行不同的操作
/login -- 请求URL,我们需要知道客户端想要访问哪里的数据
user=zhangsan&pass=123456 -- 请求数据
HTTP/1.1 -- 协议版本
如果要我们自己遍历请求字符串提取出上述的信息无疑是非常麻烦的,为了简化操作,我们可以使用正则表达式来提取字符串中的特定数据。正则表达式 (regular expression) 描述了一种字符串匹配的模式 (pattern),可以用来检查一个串中是否含有某种子串、或者将匹配的子串替换、亦或是从某个串中取出符合某个条件的子串等。简单来说,正则表达式就是基于某种字符串匹配规则来提取字符串中的特定数据。
正则表达式的使用,可以使得 HTTP 请求的解析更加简单 (这里指的时程序员的工作变得的简单,这并不代表处理效率会变高,实际上效率上是低于直接的字符串处理的),使我们实现的 HTTP 组件库使用起来更加灵活。而 regex 则是 C++ 为我们提供的正则库。
对于简单的 regex 使用,我们只需要掌握 regex_match 函数的使用即可:
// 简单理解 regex_match 函数
bool regex_match(string src, smatch matches, regex e);
src: 用于匹配的原始字符串;
e: 字符串的匹配规则;
matches: 用于存在根据匹配规则e对原始字符串src进行匹配得到的结果;
返回值:匹配成功返回true,匹配失败返回false
正则表达式的匹配规则是固定的,下面我列举几个较为常用的,其余的大家可以 点击 查看:
-字符 | -描述 |
---|---|
\ | 将下一个字符标记为一个特殊字符、或一个原义字符、或一个向后引用、或一个八进制转义符。例如,“n ”匹配符“n ”。“\n ”匹配一个换行符。串行“\\ ”匹配“\ ”而“\( ”则匹配“( ”。 |
* | 匹配前面的子表达式零次或多次。例如,zo*能匹配“z ”以及“zoo ”。*等价于{0,}。 |
+ | 匹配前面的子表达式一次或多次。例如,“zo+ ”能匹配“zo ”以及“zoo ”,但不能匹配“z ”。+等价于{1,}。 |
? | 匹配前面的子表达式零次或一次。例如,“do(es)? ”可以匹配“does ”或“does ”中的“do ”。?等价于{0,1}。 |
. | 匹配除“\ n ”之外的任何单个字符。要匹配包括“\ n ”在内的任何字符,请使用像“`(. |
x|y | 匹配x或y。例如,“`z |
下面是正则表达式以及 C++ 正则库使用的一个简单示例:
bool test1() {
// 原始字符串
std::string str = "/number/123";
// 正则表达式(字符串匹配规则)
// 前面的 "/number/" 表示原字符串,() 表示开始根据模式进行匹配,"\\" 转义为 "\", "\d"表示匹配数字字符,+表示匹配一次或多次
std::regex e("/number/(\\d+)");
// 存放匹配结果的容器
std::smatch matches;
// 正则匹配
bool ret = std::regex_match(str, matches, e);
if(ret == false) return false;
for(auto &e : matches) {
std::cout << e << std::endl;
}
return true;
}
从输出结果我们可以发现,smatch 会先将原字符串进行保存,然后才会保存正则匹配的结果。
下面我们给出 HTTP 请求的正则匹配示例:
bool test2() {
// HTTP请求行格式:"GET /login?user=zhangsan&pass=123456 HTTP/1.1\r\n"
std::string str = "GET /login?user=zhangsan&pass=123456 HTTP/1.1\r\n";
std::smatch matches;
// 正则表达式
// 提取请求方法:(GET|POST|PUT|DELETE|HEAD) -- |表示或者,()表示提取匹配结果,整体表示提取其中任意一个字符串
// 提取请求URI:_([^?]*) -- 我们用_表示空格,[^?]表示匹配除?号外的所有字符,*表示可以多次匹配
// 提取数据:\\?(.*) -- \\?表示普通的?字符,(.*)表示可以多次匹配任意字符并提取\
// 外边的(?:)?表示如果没有数据,则匹配但不提取
// 提取协议版本:_ (HTTP/1\\.[01]) -- _表示空格,HTTP/1\\.表示匹配原字符串中的HTTP/1\\.,其中\\.表示普通的.,最后[01]表示匹配0或者1
// \r\n处理:(?:\n|\r\n)? -- (?:xxx)表示匹配xxx格式字符串但不提取,最后的?表示执行前面的表达式一次或零次
std::regex e("(GET|POST|PUT|DELETE|HEAD) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?");
if(std::regex_match(str, matches, e) == false) return false;
for(int i = 0; i < matches.size(); i++) {
std::cout << i << ":" << matches[i] << std::endl;
}
return true;
}
6. any
在本项目中,我们要实现一个高并发的服务器组件,能够的接收并处理客户端发送过来的请求,就必然涉及到与客户端的通信,而通信就必然涉及到对套接字的操作;同时,由于 TCP 是面向字节流的,因此服务器在接收客户端数据的时候就可能出现 socket 中的数据不足一条完整请求的情况,此时我们请求处理到一半时就需要停下来等待 socket 中下一次的数据到来。
因此我们需要为客户端连接设置一个请求处理的上下文,用来保存请求接收、解析以及处理的状态,它决定着对于下一次从缓冲区中取出的数据如何进行处理、从哪里开始处理等。同时,对于一条完整的请求,我们还需要对其进行解析,得到各种关键的要素,比如 HTTP 请求中的请求方法、请求URL、HTTP版本等,这些信息都会被保存在请求处理上下文中。
那么我们应该如何保存请求接收、解析以及处理的各种状态信息呢,定义一个 HTTP 请求信息的结构用于填充吗?如果我们的服务器组件仅支持 HTTP 协议这样做是可以的,但我们设计的服务器的目标是要能够支持各种不同的应用层协议,便于我们组件的使用者能够根据自己不同的业务场景定制对应的应用层协议进行使用,因此我们就需要让这个结构能够保存不同类型的数据,此时就需要 any 出场了。
通用容器 any 是一种能够存储任何类型数据的容器。在C语言中,通用类型可以使用 void* 来管理;在C++中,C++17 STL库中提供了可直接使用的 any 类,但由于 any 类的实现并不复杂,同时考虑到代码的移植性,尽量减少第三方库的依赖,所以这里我们选择自己手动实现一个 any 类。
下面是对 any 类设计的一些思考:
-
首先Any类肯定不能是一个模板类,否则编译的时候 Any<int> a or Any<float>b 需要传递类型作为模板参数,也就是说在使用的时候就要确定其类型;这是行不通的,因为我们在定义 any 对象的时候是不知道保存在 Content 中的协议上下文的协议类型的,因此无法传递类型作为模板参数。
template<class T> class Any { private: T _content; }; Any<int> a1; Any<float> a2;
-
接下来我们考虑在 any 类的内部嵌套一个模板类 holder,这样也不行,在 Any 类中无法定义 holder 类的对象或指针,因为 any 也不知道这个类要保存什么类型的数据,因此无法传递类型参数。
class Any{ private: template<class T> class hodler{ T _val; }; hodler<T> _content; };
-
下面我们在嵌套模板类的基础上做进一步的优化,利用 C++ 多态的思想,在 any 内部嵌套两个类,一个普通父类 hodler,然后一个模板类 placehodler 继承自 hoder,最后在 Any 中定义一个父类的指针即可。
由于父类 hodler 是普通类,所以 Any 中可以定义父类的指针,同时又因为子类 placehodler继承自 hodler,所以当 Any 容器需要保存一个数据的时候,只需要 new 一个带有模板参数的子类 placehodler 对象出来保存数据,然后让 Any 类中的父类指针,指向这个子类对象就行了。(运行时多态,父类指针指向子类对象,调用的是子类的虚函数)
class Any{ private: class hodler { // ... }; template<class T> class placehodler : public hodler { T_ val; }; hodler *_content; }; // 定义any时不需要指定any的类型 Any any; // 需要存储特定数据类型时,直接new该类型对应的placehodler对象,然后将其交由any中的父类指针_content管理 placehodler<int> *h1 = new placehodler; any._content = h1; placehodler<double> *h2 = new placehodler; any._content = h2;
Any 类的实现代码如下:
#include <iostream>
#include <typeinfo>
#include <utility>
class Any {
private:
class holder {
public:
virtual ~holder() {}
virtual const std::type_info& type() = 0;
virtual holder *clone() = 0;
};
template<class T>
class placeholder : public holder {
public:
placeholder(const T &val) : _val(val) {}
virtual ~placeholder() {}
/*获取子类保存的数据的类型*/
virtual const std::type_info& type() { return typeid(T); }
/*针对当前对象自身,克隆出一个新的子类对象*/
virtual holder *clone() { return new placeholder(_val); }
public:
T _val; // 真正需要保存的数据
};
public:
Any() : _content(nullptr) {}
/*通过特定类型数据创建Any对象*/
template<class T>
Any(const T &val) : _content(new placeholder<T>(val)) {}
/*通过其他Any对象创建一个新的Any对象*/
Any(const Any &other) : _content(other._content == nullptr ? nullptr : other._content->clone()) {}
~Any() { if(_content) delete _content; }
/*获取子类placeholder中保存的数据的指针*/
template<class T>
T* get() {
// 想要获取的数据的类型与保存的数据的类型必须是一致的
if(typeid(T) != _content->type()) return nullptr;
// 由于父类holder中没有val,所以需要进行强转
return &((placeholder<T>*)_content)->_val;
}
/*交换两个Any对象中的指针*/
Any& swap(Any &other) { std::swap(_content, other._content); }
/*赋值运算符重载*/
template<class T>
Any& operator=(const T &val) {
// 利用临时对象来完成赋值操作,赋值完成后原对象中的资源出作用域自动释放
Any(val).swap(*this);
return *this;
}
Any& operator=(Any other) {
// 利用形参的拷贝构造(临时对象)来完成赋值操作,赋值完成后原对象中的资源出作用域自动释放
swap(other);
return *this;
}
private:
holder *_content; // 保存父类指针,利用多态性质完成Any类保存任意类型数据的功能
};
测试代码如下:
class Test {
public:
Test() { std::cout << "构造" << std::endl; }
Test(const Test& t) { std::cout << "拷贝构造" << std::endl; }
~Test() { std::cout << "析构" << std::endl; }
};
int main()
{
Any any;
any = 10;
int *pa = any.get<int>();
std::cout << *pa << std::endl;
any = 10.24;
double* pd = any.get<double>();
std::cout << *pd << std::endl;
any = std::string("hello");
std::string *ps = any.get<std::string>();
std::cout << *ps << std::endl;
// 测试是否存在内存泄露
{
any = Test();
}
return 0;
}
三、框架设计
1. 项目模块划分
由于本项目实现的是一个带有协议支持的 Reactor 模型高性能服务器组件,因此将整个项目划分为两个大的模块:
- SERVER 模块:实现 Reactor 模型的 TCP 服务器。
- 协议模块:对当前的 Reactor 模型服务器提供应用层协议支持。(目前仅提供 HTTP 协议支持)
1.1 SERVER 模块
SERVER 模块就是对所有的连接以及线程进行管理,让它们各司其职,在合适的时候做合适的事,最终完成高性能服务器组件的实现。管理具体分为三个方面:
- 监听连接管理:对监听连接进行管理。
- 通信连接管理:对通信连接进行管理。
- 超时连接管理:对超时连接进行管理。
基于以上的管理思想,可以将 SERVER 模块划分为以下的一些子模块:
- Buffer 模块:实现通信套接字的用户态缓冲区,防止接收到的数据不是一条完整的数据,同时确保客户端响应的数据在套接字可写的情况下进行发送。
- Socket 模块:对 socket 套接字的操作进行封装,使得程序中对于套接字的各项操作更加简便。
- Channel 模块:对于一个描述符进行监控事件管理,便于在用户态对描述符的监控事件进行维护。
- Connection 模块:对通信连接进行整体管理,一个连接的所有操作都通过此模块来完成,增加连接操作的灵活以及便捷性。
- Acceptor 模块:对监听套接字进行管理,为客户端的新建连接创建 Connection 对象,并设置各种回调。
- TimerQueue 模块:定时任务模块,让一个任务可以在指定的时间之后被执行。
- Poller模块:对任意的描述符进行IO事件监控,本质上就是对 epoll 的各种操作进行封装,从而让对描述符进行事件监控的操作更加简单,此模块是 Channel 模块的一个子模块。
- EventLoop 模块:对事件监控进行管理,为了确保线程安全,此模块一个模块对应一个线程,服务器中的所有的事件都是由此模块来完成。
- LoopThread 模块:将 EventLoop 与 thread 整合到一起,向外部返回所实例化的 EventLoop 对象,即将 EventLoop 对象与线程一一绑定。
- LoopThreadPool 模块:LoopThread 线程池,用于对所有的 LoopThread 进行管理及分配。
- TcpServer 模块:对前边所有子模块进行整合,从而提供给组件使用者的可以便捷的完成一个高性能服务器搭建的模块。
1.2 协议模块
协议模块用于对 SERVER 模块提供应用层协议支持,基于提供的协议支持能够更方便的完成指定协议服务器的搭建,同时还能够根据不同的应用场景切换不同的应用层协议。项目目前只提供了 HTTP 协议支持。
HTTP 协议支持模块可以划分为以下几个子模块:
- Util 模块:工具模块,提供 HTTP 协议模块所用到的一些工具函数,比如 URL 编码与解码、文件数据读取与写入等。
- HttpRequest 模块:HTTP 请求数据模块,用于保存 HTTP 请求数据被解析后的各项请求元素信息。
- HttpResponse 模块:HTTP 响应数据模块,用于业务处理后设置并保存 HTTP 响应数据的的各项元素信息。
- HttpContext 模块:HTTP 请求接收的上下文模块,用于防止在一次接收的数据中不是一个完整的 HTTP 请求,便于后续接收新数据后继续根据上下文进行解析。
- HttpServer 模块:对 HTTP 协议支持的所有模块的整合,让HTTP服务器的搭建变得更加简便。
整体的模块示意图如下:
2. 项目模块关系图
2.1 Connection 模块关系图
2.2 Acceptor 模块关系图
2.3 EventLoop 模块关系图
四、SERVER 模块开发
1. Buffer 模块
Buffer 模块的设计思想如下:
具体实现时的一些细节如下:
- 应该选用 STL 中的 vector<char> 而不是 string 来保存数据,因为 string 遇到 ‘\0’ 就会结束,而通信数据中可能会包含 ‘\0’。
- 应该分别记录写入数据位置和读取数据位置,避免每次写入数据时还有重新遍历数组寻找可写入位置。
- 由于缓冲区的默认大小是固定的且数据是不断到来的,为了充分利用空间,在插入新数据时如果后续剩余空间不够我们应该将超出的数据存放到前方已读取数据的空间中,如果前后所有的空间合并起来都不够时,再考虑扩容。
- 最后,我们应该专门为字符串以及 HTTP 提供一些相关的功能接口,从而便于我们后续使用,比如重载读取与写入 string 类型的数据、从缓冲区中取出一行HTTP请求数据等。
Buffer 类的实现代码如下:
/*用户缓冲区类*/
class Buffer {
public:
Buffer() : _capacity(DEFAULT_BUFFER_SIZE), _ridx(0), _widx(0), _buffer(_capacity) {}
Buffer(const Buffer &buf) : _capacity(buf._capacity), _ridx(buf._ridx), _widx(buf._widx), _buffer(buf._buffer) {}
/*返回缓冲区中可读数据的大小*/
uint32_t ReadAbleSize() const { return _widx - _ridx; }
/*返回当前读取数据的起始地址*/
const char* ReadStartAddr() const { return &_buffer[_ridx]; }
/*将读取数据位置向后偏移*/
void MoveReadOffset(uint32_t len) {
assert(len <= ReadAbleSize());
_ridx += len;
}
/*清空缓冲区中的数据*/
void Clear() {
Buffer buf;
std::swap(*this, buf);
}
/*从缓冲区读取数据*/
void Read(void* buf, uint32_t len) {
// 要读取的数据大小一定要小于等于可读数据大小
assert(len <= ReadAbleSize());
// 获取当前读取数据的起始地址
char *raddr = &_buffer[_ridx];
// 拷贝数据
std::copy(raddr, raddr + len, (char*)buf);
// 更新当前读取数据位置
_ridx += len;
}
/*重载读取string类型的数据*/
std::string Read(uint32_t len) {
std::string str;
str.resize(len);
Read((void*)&str[0], len);
return str;
}
/*确保缓冲区可写入空间足够(拷贝+扩容)*/
void CheckWriteSpace(uint32_t len) {
// 获取缓冲区可写入空间大小 -- 写入位置后面未使用的空间+读取位置前已读取的空间
uint32_t size = _ridx + (_capacity - _widx);
// 空间足够则调整数据分布
if (size >= len) {
std::copy(&_buffer[_ridx], &_buffer[_ridx] + ReadAbleSize(), &_buffer[0]);
// 这里需要先更新写位置,再更新读位置,否则ReaableSize()得到的结果不同
_widx = ReadAbleSize();
_ridx = 0;
}
// 空间不够则扩容,一次扩容两倍
else {
_capacity *= 2;
_buffer.resize(_capacity);
// 扩容完毕后再次检查可写入空间是否足够
CheckWriteSpace(len);
}
}
/*向缓冲区中写入数据*/
void Write(const void *data, uint32_t len) {
// 确保缓冲区可写入空间足够
CheckWriteSpace(len);
// 写入数据到缓冲区
std::copy((char*)data, (char*)data + len, &_buffer[_widx]);
// 更新当前写入数据位置
_widx += len;
}
/*重载写入string类型的数据*/
void Write(const std::string &data, uint32_t len) {
Write((const void*)data.c_str(), len);
}
// /*重载写入Buffer类型的数据*/
// void Write(const Buffer &data, uint32_t len) {
// // 取出data缓冲区中的所有数据
// Write((const void*)data.ReadStartAddr(), data.ReadAbleSize());
// }
/*清空缓冲区中数据*/
void Clean() {
// 只需要重置读取写入位置即可,不需要清空缓冲区中的原数据,后续写入直接覆盖原数据
_ridx = 0;
_widx = 0;
}
/*针对HTTP请求行的一些操作*/
/*查找请求行结束符\r\n*/
char* FindCRLF() { return (char*)memchr((void*)ReadStartAddr(), '\n', ReadAbleSize()); }
/*从缓冲区中取出一行HTTP请求数据*/
std::string GetLine() {
char *pos = FindCRLF();
if (pos == nullptr) return std::string();
// +1是为了将'\n'也取出来
return Read(pos - ReadStartAddr() + 1);
}
private:
uint32_t _capacity; // 缓冲区大小
uint32_t _ridx; // 当前读取数据位置
uint32_t _widx; // 当前写入数据位置
std::vector<char> _buffer; // 使用vector模拟缓冲区而不是string
};
2. Socket 模块
Socket 模块的设计思想如下:
具体实现时的一些细节如下:
- Socket 类的目的是对 socket 原生的各种操作进行封装,便于我们后面使用,但即使是这样,创建一个服务端/客户端连接的步骤也显得较为繁琐,所以我们在 Socket 类中提供了直接创建一个服务端连接以及直接创建一个客户端连接的接口。
- 在 TCP 中,一个连接 bind 了一个地址与端口后,一旦连接断开则会进入 time_wait 状态,此时连接不会立即释放,会继续占用地址和端口,这种策略是用来保护客户端的,但它也会造成我们服务器崩溃后不能立即重新启动,因此我们需要对服务端连接设置套接字选项,开启地址与端口复用。
- 我们通过 recv/send 系统调用来读取与发送 socket 中的数据时,一般会直接将 socket 缓冲区读空或者写满,而由于套接字默认是阻塞的,因此这会导致我们的程序阻塞在 recv/send 函数这里,因此我们还需要为套接字设置非阻塞属性。
Socket 类的实现代码如下:
/*等待连接队列的最大长度*/
#define G_BACKLOG 1024
/*套接字类*/
class Socket {
public:
Socket() : _sockfd(-1) {}
Socket(int fd) : _sockfd(fd) {}
~Socket() { Close(); }
/*返回套接字*/
int Fd() { return _sockfd; }
/*创建套接字*/
bool CreateSocket() {
_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(_sockfd == -1) {
LOG(ERROR, "socket create failed");
return false;
}
return true;
}
/*绑定ip+port*/
bool Bind(uint16_t port, const std::string &ip = "0.0.0.0") {
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
int n = bind(_sockfd, (const sockaddr*)&addr, len);
if(n == -1) {
LOG(ERROR, "socket bind failed: %s", strerror(errno));
return false;
}
return true;
}
/*开始监听*/
bool Listen(int g_backlog = G_BACKLOG) {
int n = listen(_sockfd, g_backlog);
if(n == -1) {
LOG(ERROR, "socket listen failed");
return false;
}
return true;
}
/*获取新连接*/
int Accept() {
// 这里我们不关心客户端的ip和port
int newfd = accept(_sockfd, nullptr, nullptr);
if(newfd == -1) {
LOG(ERROR, "accept new link failed");
}
return newfd;
}
/*向服务器发起连接*/
bool Connect(uint16_t port, const std::string &ip) {
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
int n = connect(_sockfd, (const sockaddr*)&addr, len);
if(n == -1) {
LOG(ERROR, "connect server failed");
return false;
}
return true;
}
/*接收数据*/
ssize_t Recv(void* buf, size_t len) {
// flag为0表示不使用任何特殊标志,此时函数是否阻塞由套接字属性决定
// 所以我们可以通过SetNonBlock接口将套接字设置为非阻塞,这样send/recv函数也会变成非阻塞
if (len == 0) return 0;
ssize_t n = recv(_sockfd, buf, len, 0);
// 接收失败
// 客户端退出
if (n == 0) {
LOG(ERROR, "client quit");
return -1;
}
else if (n < 0) {
// 非阻塞下接收缓冲区无数据或者读取被中断
if (errno == EAGAIN || errno == EINTR) {
LOG(DEBUG, "socket recv EAGAIN or EINTR");
return 0;
}
// 读取错误
LOG(ERROR, "socket recv failed");
return -1;
}
// 返回接收到的数据字节数
return n;
}
/*发送数据*/
ssize_t Send(const void* buf, size_t len) {
if (len == 0) return 0;
ssize_t n = send(_sockfd, buf, len, 0);
// 发送失败
// 客户端退出
if (n == 0) {
LOG(ERROR, "client quit");
return -1;
}
else if (n < 0) {
// 非阻塞下发送缓冲区满且写事件未就绪或者发送被中断
if (errno == EAGAIN || errno == EINTR) {
LOG(DEBUG, "socket send EAGAIN or EINTR");
return 0;
}
// 发送错误
LOG(ERROR, "socket send failed");
return -1;
}
// 发送成功返回实际发送的字节数
return n;
}
/*设置套接字选项 -- 开启地址与端口复用*/
void SetReuseAddr() {
// 1代表设置套接字选项等级 SOL -- socket options level
int optval = 1;
int n1 = setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&optval, sizeof optval);
optval = 1;
int n2 = setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void*)&optval, sizeof optval);
// if (n1 == 0 && n2 == 0) LOG(DEBUG, "SetReuseAddr Success");
// else LOG(ERROR, "SetReuseAddr Failed: %s", strerror(errno));
}
/*设置套接字非阻塞属性*/
void SetNonBlock() {
//获取fd当前选项
int flag = fcntl(_sockfd, F_GETFL);
//将非阻塞选项设置进fd中
fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
}
/*关闭套接字*/
void Close() {
if (_sockfd != -1) {
close(_sockfd);
_sockfd = -1;
}
}
/*直接创建一个服务端连接*/
bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool non_block = true) {
// 创建套接字、设置地址重用、设置非阻塞、绑定、监听
// 服务器连接一般bind任意ip,所以ip缺省值为0.0.0.0
if (CreateSocket() == false) return false;
// 注意:设置地址重用必须放在bind之前,这样才会生效
SetReuseAddr();
if (non_block) SetNonBlock();
if (Bind(port, ip) == false) return false;
if (Listen() == false) return false;
return true;
}
/*直接创建一个客户端连接*/
bool CreateClient(uint16_t port, const std::string &ip) {
// 创建套接字、连接服务器
// 客户端不需要显示的bind,有OS自动bind,也不需要listen与accept,只需要connect
if (CreateSocket() == false) return false;
if (Connect(port, ip) == false) return false;
return true;
}
private:
int _sockfd;
};
3. Channel 模块
Channel 模块的设计思想如下:
具体实现时的一些细节如下:
- 通信描述符事件触发后会调用回调函数进行处理,而这个回调函数是由 Connection 模块设置给 Channel 模块的,因为 Connection 是对通信连接进行整体管理的一个模块,Channel 模块只是 Connection 模块的一个子模块。
- 为了保证线程安全,添加/修改/移除事件监控的操作需要放到 Connection 对象关联的 EventLoop 对应的线程中去执行,同时,对描述监控事件的修改最后也必须通过 Poller 模块中的 epoll 相关函数来完成,而 Poller 模块也是 EventLoop 的一个子模块。
Channel 类的实现代码如下:
class EventLoop;
/*描述符监控事件管理类*/
class Channel {
using EventCallback = std::function<void()>;
public:
Channel(EventLoop *loop, int fd) : _loop(loop), _fd(fd), _events(0), _revents(0) {}
/*返回文件描述符*/
int Fd() { return _fd; }
/*返回被监控的事件*/
uint32_t Events() { return _events; }
/*各种回调函数的设置 -- 对应事件被触发后在HandleEvent函数中调用*/
void SetReadCallback(const EventCallback &cb) { _read_cb = cb; }
void SetWriteCallback(const EventCallback &cb) { _write_cb = cb; }
void SetCloseCallback(const EventCallback &cb) { _close_cb = cb; }
void SetEventCallback(const EventCallback &cb) { _event_cb = cb; }
void SetErrorCallback(const EventCallback &cb) { _error_cb = cb; }
/*启动可读事件监控*/
void EnableRead() {
_events |= EPOLLIN;
Update();
}
/*启动可写事件监控*/
void EnableWrite() {
_events |= EPOLLOUT;
Update();
}
/*判断当前是否监控了可读事件*/
bool Readable() { return (_events & EPOLLIN); }
/*判断当前是否监控了可写事件*/
bool Writeable() { return (_events & EPOLLOUT); }
/*关闭可读事件监控*/
void DisableRead() {
_events &= ~EPOLLIN;
Update();
}
/*关闭可写事件监控*/
void DisableWrite() {
_events &= ~EPOLLOUT;
Update();
}
/*关闭所有事件监控*/
void DisableEvent() {
_events = 0;
Update();
}
/*移除事件监控*/
/*添加/修改事件监控*/
// 注意:由于我们我们上面只是声明了有EventLoop这个类,并不知道EventLoop里面有哪些函数
// 但是Remove和Update又需要调用EventLoop::RemoveEvent和EventLoop::UpdateEvent
// 所以我们需要将它们的实现放到EventLoop类后面,这里仅仅是声明
void Remove();
void Update();
/*设置就绪事件*/
void SetRevents(uint32_t events) { _revents = events; }
/*事件处理 -- 事件触发后调用(函数内部通过_revents判断事件类型)*/
void HandleEvent() {
// EPOLLIN读事件就绪/EPOLLRDHUP客户端连接半关闭(需要读取缓冲区中可能的剩余数据)/EPOLLPRI存在优先带外数据
if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) {
if (_read_cb) _read_cb();
}
// 有可能会让连接释放的就绪事件一次只执行一个
// EPOLLOUT写事件就绪
if (_revents & EPOLLOUT) {
if (_write_cb) _write_cb();
}
// EPOLLHUP客户端连接关闭事件就绪
else if (_revents & EPOLLHUP) {
if (_close_cb) _close_cb();
}
// EPOLLERR错误事件就绪
else if (_revents & EPOLLERR) {
if (_error_cb) _error_cb();
}
// 虽然错误事件可能导致连接释放,但由于连接释放操作都是先压入到任务队列中,等待业务处理完毕后才会执行任务队列中的任务
// 即连接真正被释放一定是在就绪事件全部处理完毕后,所以这里可以将event_cb放到最后,这样也保证了连接的活跃度刷新是在业务处理完毕后进行的
if (_event_cb) _event_cb();
}
private:
int _fd; // 进行监控事件管理的连接(描述符)
EventLoop *_loop; // 描述符事件管理与处理句柄
uint32_t _events; // 当前连接需要被监控的事件
uint32_t _revents; // 当前连接已经触发的事件
EventCallback _read_cb; // 可读事件被触发的回调函数
EventCallback _write_cb; // 可写事件被触发的回调函数
EventCallback _close_cb; // 连接断开事件被触发的回调函数
EventCallback _event_cb; // 任意事件被触发的回调函数
EventCallback _error_cb; // 错误事件被触发的回调函数
};
/*Channel::Remove和Channel::Update函数的实现*/
/*移除事件监控*/
void Channel::Remove() { return _loop->RemoveEvent(this); }
/*添加/修改事件监控*/
void Channel::Update() { return _loop->UpdateEvent(this); }
4. Poller 模块
Poller 模块的设计思想如下:
具体实现时的一些细节如下:
- 由于描述符需要被监控的事件 _events 以及事件触发后的各种回调函数都保存在 Channel 中,并且就绪事件也需要保存到 Channel 的 _revents 中,因此在 Poller 中我们需要保存描述符与 Channel 的关联关系,这样才能知道要添加哪些事件监控,以及事件就绪后应该如何处理。
Poller 类的实现代码如下:
/*就绪数组一次能够容纳的最大事件数量*/
#define MAX_EPOLLEVENTS 1024
/*epoll_create参数,此参数现已被忽略,但要求大于0*/
#define MAX_SIZE 1024
/*描述符IO事件监控类*/
class Poller {
private:
/*对epoll相关函数的直接操作*/
void Update(Channel *channel, int op) {
int fd = channel->Fd();
struct epoll_event ev;
ev.events = channel->Events();
ev.data.fd = fd;
int ret = epoll_ctl(_epfd, op, fd, &ev);
if (ret == -1) {
LOG(ERROR, "fd %d epoll_ctl failed", fd);
return;
}
}
/*判断一个Channel是否已经添加了事件监控*/
bool HasChannel(Channel *channel) {
if (channel == nullptr) return false;
auto it = _channels.find(channel->Fd());
return !(it == _channels.end());
}
public:
Poller() {
// 创建epoll模型
_epfd = epoll_create(MAX_SIZE);
if (_epfd == -1) {
LOG(FATAL, "epoll_create failed: %s", strerror(errno));
// epoll模型创建失败直接退出程序
abort();
}
}
/*添加或修改事件监控*/
void UpdateEvent(Channel *channel) {
// 没添加则添加,已经添加则修改
if (HasChannel(channel) == false) {
_channels[channel->Fd()] = channel;
return Update(channel, EPOLL_CTL_ADD);
}
return Update(channel, EPOLL_CTL_MOD);
}
/*移除事件监控*/
void RemoveEvent(Channel *channel) {
// 存在则移除,不存在直接返回
auto it = _channels.find(channel->Fd());
if (it == _channels.end()) return;
_channels.erase(it);
Update(channel, EPOLL_CTL_DEL);
}
/*启动事件监控并返回活跃连接*/
void Poll(std::vector<Channel*> *actives) {
// timeout设为-1表示阻塞式监控
int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1);
if (nfds < 0) {
if (errno == EINTR) {
LOG(DEBUG, "epoll wait EINTR");
return;
}
LOG(FATAL, "epoll wait failed: %s", strerror(errno));
// 事件监控出错直接退出程序
abort();
}
// 设置实际就绪的事件
for (int i = 0; i < nfds; i++) {
int fd = _evs[i].data.fd;
auto it = _channels.find(fd);
// 连接如果就绪,那么对应的信息一定存在于_channel中
assert(it != _channels.end());
Channel *channel = it->second;
channel->SetRevents(_evs[i].events);
actives->push_back(channel);
}
}
private:
int _epfd; // epoll操作句柄
struct epoll_event _evs[MAX_EPOLLEVENTS]; // 就绪数组 -- 保存所有的活跃连接
std::unordered_map<int, Channel*> _channels; // 建立连接与其监控事件管理Channel句柄的关联关系
};
5. EventLoop 模块
EventLoop 模块的设计思想如下:
具体实现时的一些细节如下:
- 当我们监控了一个客户端连接后,一旦这个连接触发了事件,就需要调用对应的回调函数进行事件处理,而在我们处理事件的过程中如果此连接触发了新的事件,那么新事件的处理就有可能被分配到其他线程中去执行,这样就有可能会导致线程安全问题。
- 那么我们需要为每一个连接的操作都加一把锁来保证线程安全吗?这样做当然是可以的,但是没必要,因为当我们的连接很多时就需要创建很多的锁,这会造成不必要的资源开销;我们仅需将一个连接的事件监控,连接的事件处理以及连接的所有其他操作都放在同一个线程中去完成即可,即让连接与线程一一对应。
- 虽然连接无法直接与线程一一对应,但是 EventLoop 模块是与线程是一一对应的,因此我们只需将一个连接与一个 EventLoop 模块相绑定,从而间接完成连接与线程的一一绑定。
- 但是这样仍不保险,因为组件使用者可能自己设计了任务线程池,再一次对任务进行了分摊,在这种情况下我们并不能保证连接的所有操作都在同一个线程中完成,那么如何保证一个连接的所有操作都必定在 EventLoop 对应的线程中呢?
- 我们的解决方案是给 EventLoop 模块中添加一个任务队列,对连接的所有操作并不直接执行,而是将其进行一次封装,然后当作任务添加到任务队列中,最后等到连接所有的就绪事件处理完了 (都添加都任务队列中了),再去将任务队列中的所有任务一一执行;此时我们仅需要对这个任务队列加一把锁保证其线程安全即可。
- 我们举个例子,在一号线程中我们对连接1进行了事件监控,此时连接触发了事件A,事件A在一号线程中被执行,执行过程中触发了事件B,由于一号线程忙碌,因此事件B被分配到二号线程中执行 (假设外部设置了任务线程池),但事件A和事件B其实并没有被真正执行,而是仅仅压入任务队列后就返回了,最后得到所有就绪事件都被压入任务队列后,我们再在一号线程中逐个取出任务队列中的任务执行,从而保证线程安全。
- 最后,因为有可能因为等待描述符IO事件就绪,导致执行流流程阻塞,这时候任务队列中的任务将得不到执行,因此需要使用 eventfd 来进行事件通知,唤醒事件监控的阻塞。
EventLoop 类的代码实现如下:
/*描述符事件监控以及事件处理类*/
class EventLoop {
using TaskFunc = std::function<void()>;
private:
/*执行任务队列中的所有任务*/
void RunAllTask() {
std::vector<TaskFunc> tasks;
{
// 由于任务队列可能会被多个线程并发访问,所以需要加锁保护
std::unique_lock<std::mutex> lock(_mutex);
// 取出任务队列中的所有任务
std::swap(_tasks, tasks);
}
// 执行任务
for (auto &task : tasks) {
task();
}
}
/*创建event_fd*/
static int CreatEventFd() {
int evfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (evfd == -1) {
LOG(FATAL, "eventfd failed: %s", strerror(errno));
abort();
}
return evfd;
}
/*eventfd的可读事件回调*/
void HandleRead() {
// 读取eventfd中的数据,使其计数器重新变为0(8字节)
uint64_t count;
ssize_t n = read(_event_fd, &count, 8);
if (n < 0) {
// 被信号中断或者非阻塞情况下无数据可读取
if (errno == EAGAIN || errno == EINTR) {
LOG(DEBUG, "read eventfd EAGAIN or EINTR");
return;
}
LOG(FATAL, "read eventfd failed: %s", strerror(errno));
abort();
}
}
/*唤醒可能因为没有IO事件监控而导致的阻塞*/
void WeakUpEventFd() {
// 向eventfd中写入数据(8字节)
uint64_t val = 1;
ssize_t n = write(_event_fd, &val, 8);
if (n < 0) {
// 被信号中断或者非阻塞情况下无数据可读取
if (errno == EAGAIN || errno == EINTR) {
LOG(DEBUG, "write eventfd EAGAIN or EINTR");
return;
}
LOG(FATAL, "write eventfd failed: %s", strerror(errno));
abort();
}
}
public:
EventLoop()
: _thread_id(std::this_thread::get_id()), _poller(), _event_fd(CreatEventFd()),
_evfd_channel(new Channel(this, _event_fd)), _timer_whell(this) {
// 设置_event_fd的可读事件回调并启动其可读事件
_evfd_channel->SetReadCallback(std::bind(&EventLoop::HandleRead, this));
_evfd_channel->EnableRead();
}
/*判断当前线程是否是EventLoop对应的线程*/
bool IsInLoop() { return _thread_id == std::this_thread::get_id(); }
/*确保当前线程就是EventLoop对应的线程*/
void AssertInLoop() { assert(_thread_id == std::this_thread::get_id()); }
/*判断将要执行的任务是否处于当前线程中,是则直接执行,不是则将其添加到任务队列中*/
void RunInLoop(const TaskFunc &task) {
if (IsInLoop()) return task();
return QueueInLoop(task);
}
/*将任务添加到任务队列中*/
void QueueInLoop(const TaskFunc &task) {
{
std::unique_lock<std::mutex> lock(_mutex);
_tasks.push_back(task);
}
// 唤醒可能因为没有IO事件监控而导致的阻塞
WeakUpEventFd();
}
/*添加/修改描述符的监控事件*/
void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); }
/*移除描述符的事件监控*/
void RemoveEvent(Channel *channel) { return _poller.RemoveEvent(channel); }
/*添加定时任务*/
void AddTimer(uint64_t id, uint32_t timeout, const TaskFunc& task) { return _timer_whell.AddTimer(id, timeout, task); }
/*刷新/延迟定时任务*/
void ReFlushTimer(uint64_t id) { return _timer_whell.ReFlushTimer(id); }
/*取消定时任务*/
void CancelTimer(uint64_t id) { return _timer_whell.CancelTimer(id); }
/*判断某个定时任务是否存在*/
bool HasTimer(uint64_t id) { return _timer_whell.HasTimer(id); }
/*启动EventLoop模块*/
void Start() {
while(true) {
// 开始监控,获取活跃连接
std::vector<Channel*> actives;
_poller.Poll(&actives);
// 处理活跃连接的就绪事件
for (auto &channel : actives) { channel->HandleEvent(); }
// 执行任务
RunAllTask();
}
}
private:
std::thread::id _thread_id; // EventLoop关联的线程ID
Poller _poller; // 描述符IO事件监控模块
int _event_fd; // eventfd -- 唤醒IO事件监控可能导致的阻塞
std::unique_ptr<Channel> _evfd_channel; // 管理_event_fd的监控事件
std::mutex _mutex; // 实现任务队列的线程安全
std::vector<TaskFunc> _tasks; // 任务队列
TimerWheel _timer_whell; // 定时器模块
};
6. TimerQueue 模块
TimerQueue 模块的设计思想如下:
具体实现时的一些细节如下:
- 在前面我们学习了 timerfd 的使用以及 timerwheel 的设计思想,而要实现一个完整的秒级定时器,就需要将这两个功能整合到一起:
- 一方面,我们将 timerfd 的超时时间设置为 1s,这样 timerfd 每秒钟就会触发一次可读事件 (timerfd 可读事件监控可以通过 EventLoop 来实现);另一方面,每当 timerfd 触发可读事件,我们就执行一次 TimerWheel 中的 RunTimerTask 函数,即执行秒针所在位置的所有超时事件。
- 这样,我们在 TimerWheel 定时器中记录所有的超时事件,然后使用 timerfd 模拟来模拟定时器秒针的移动,从而实现了非活跃连接在 N 秒后释放的功能。
TimerQueue 类的代码实现如下:
class EventLoop;
/*时间轮*/
class TimerWheel {
using TaskFunc = std::function<void()>;
using TaskPtr = std::shared_ptr<TimerTask>;
using TaskWeak = std::weak_ptr<TimerTask>;
private:
/*SetRelease回调函数 -- 从unordered_map中将定时任务信息移除*/
void RemoveTimer(uint64_t id) {
auto it = _timers.find(id);
if(it != _timers.end())
_timers.erase(it);
}
/*创建定时器*/
static int CreateTimerFd() {
// 创建定时器
int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
if(timerfd == -1) {
LOG(FATAL, "timerfd create failed: %s", strerror(errno));
abort();
}
// 设置首次超时时间 -- 1s
struct itimerspec itime;
itime.it_value.tv_sec = 1;
itime.it_value.tv_nsec = 0;
// 设置后续超时时间间隔 -- 1s
itime.it_interval.tv_sec = 1;
itime.it_interval.tv_nsec = 0;
// 启动定时器
int n = timerfd_settime(timerfd, 0, &itime, NULL);
if (n == -1) {
LOG(FATAL, "timerfd settime failed: %s", strerror(errno));
abort();
}
return timerfd;
}
/*定时器可读事件回调*/
uint64_t ReadTimerFd() {
// 读取timerfd中的内容 -- 使其计数器变为0(8字节)
uint64_t times;
int n = read(_timer_fd, ×, 8);
if (n < 0) {
// 被信号中断或者非阻塞情况下无数据可读取
if (errno == EAGAIN || errno == EINTR) {
LOG(DEBUG, "read timerfd EAGAIN or EINTR");
return 0;
}
LOG(FATAL, "read timerfd failed: %s", strerror(errno));
abort();
}
return times;
}
/*执行定时任务 -- 此函数应一秒被执行一次,相当于秒针向后移动一步*/
void RunTimerTask() {
_tick = (_tick + 1) % _capacity;
// 将秒针对应位置的定时任务队列清空,即释放定时任务对象的shared_ptr
_wheel[_tick].clear();
}
/*定时任务达到超时时间 -- 每秒钟超时一次*/
void OnTime() {
// 获取定时器超时次数
uint64_t times = ReadTimerFd();
// 执行定时任务
for (uint64_t i = 0; i < times; i++) {
RunTimerTask();
}
}
/*在EventLoop对应的线程中添加定时任务*/
void AddTimerInLoop(uint64_t id, uint32_t timeout, const TaskFunc &cb) {
// 实例化定时任务对象并交由智能指针管理
TaskPtr tp(new TimerTask(id, timeout, cb));
// 为了不增加引用计数,这里不能使用shared_ptr
_timers[id] = TaskWeak(tp);
tp->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));
// 将定时任务对象的shared_ptr添加到时间轮中
int pos = (_tick + timeout) % _capacity;
_wheel[pos].push_back(tp);
}
/*在EventLoop对应的线程中刷新/延迟定时任务*/
void ReFlushTimerInLoop(uint64_t id) {
// 通过定时任务的weak_ptr构造一个shared_ptr
auto it = _timers.find(id);
if(it == _timers.end()) return;
TaskPtr tp = (it->second).lock();
// 获取超时时间
uint32_t timeout = tp->Timeout();
// 将shared_ptr添加到时间轮中
int pos = (_tick + timeout) % _capacity;
_wheel[pos].push_back(tp);
}
/*在EventLoop对应的线程中取消定时任务*/
void CancelTimerInLoop(uint64_t id) {
// 通过定时任务的weak_ptr构造一个shared_ptr
auto it = _timers.find(id);
if(it == _timers.end()) return;
TaskPtr tp = (it->second).lock();
// 通过shared_ptr设置定时任务的取消状态
if (tp) tp->Cancel();
}
public:
TimerWheel(EventLoop *loop)
: _capacity(60), _tick(0), _wheel(_capacity), _timer_fd(CreateTimerFd()), _loop(loop), _timerfd_channel(new Channel(_loop, _timer_fd)) {
// 设置定时器可读事件回调 -- 读取timerfd计数器,执行定时任务
_timerfd_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));
// 启动定时器可读事件监控
_timerfd_channel->EnableRead();
}
/*判断某个定时任务是否已经存在 -- 存在线程安全问题*/
// 由于HasTimer的返回值是bool类型,所以无法像Timeradd等函数一样将其放入到EventLoop的任务队列中去执行
// 所以存在线程安全问题,因此我们在调用此函数时需要保证是在EventLoop对应的线程中调用的
bool HasTimer(uint64_t id) {
auto it = _timers.find(id);
if (it == _timers.end()) return false;
return true;
}
/*为了保证添加/刷新/取消定时器任务的线程安全,我们需要在EventLoop对应的线程中去执行它们*/
// 同时,这里也和Channel::Remove Channel::Update函数一样,我们使用了EventLoop中的成员函数
// 所以,下面只给出声明,函数的具体实现需要放到EventLoop类下面
/*添加定时任务*/
void AddTimer(uint64_t id, uint32_t timeout, const TaskFunc &cb);
/*刷新/延迟定时任务*/
void ReFlushTimer(uint64_t id);
/*取消定时任务*/
void CancelTimer(uint64_t id);
private:
int _capacity; // 容量,代表最大超时时间
int _tick; // 当前的秒针,指向哪里就执行哪里的定时任务
std::vector<std::vector<TaskPtr>> _wheel; // 存放定时任务智能指针的时间轮
std::unordered_map<uint64_t, TaskWeak> _timers; // 定时器任务id与指针之间的关联关系
int _timer_fd; // 超时事件描述符
EventLoop *_loop; // 描述符事件监控与处理句柄
std::unique_ptr<Channel> _timerfd_channel; // 描述符监控事件管理句柄
};
/*TimerWheel::AddTimer、TimerWheel::ReFlushTimer和TimerWheel::CancelTimer函数的实现*/
/*添加定时任务*/
void TimerWheel::AddTimer(uint64_t id, uint32_t timeout, const TaskFunc &cb) {
return _loop->RunInLoop(std::bind(&TimerWheel::AddTimerInLoop, this, id, timeout, cb));
}
/*刷新/延迟定时任务*/
void TimerWheel::ReFlushTimer(uint64_t id) {
return _loop->RunInLoop(std::bind(&TimerWheel::ReFlushTimerInLoop, this, id));
}
/*取消定时任务*/
void TimerWheel::CancelTimer(uint64_t id) {
return _loop->RunInLoop(std::bind(&TimerWheel::CancelTimerInLoop, this, id));
}
7. Connection 模块
Connection 模块的设计思想如下:
具体实现时的一些细节如下:
- Connection 模块是对连接进行全方位管理的一个模块,而管理具体包括套接字的管理 – 使连接能够进行套接字的操作,连接事件的管理 – 包括可读,可写,错误,挂断以及任意事件,缓冲区的管理 – 便于 socket 数据的接收和发送,协议上下文的管理 – 用于记录请求数据的处理过程,以及回调函数的管理 – 提供连接建立完成、接收新数据、连接关闭、任意事件的回调函数设置接口,让组件使用者能够根据需要进行设置。
- Connection 模块需要提供数据发送接口,但这并不是真正的发送接口,而只是把数据放到用户态发送缓冲区,然后描述符启动写事件监控,待到 socket 缓冲区可写后再真正发送数据;同样,关闭连接接口也并不是直接关闭连接,而应该在实际释放连接之前,看看输入输出缓冲区中是否有数据待处理,有则处理后再真正关闭连接;最后,一个连接接收到数据后应该如何进行业务处理,取决于上下文以及数据的业务处理回调函数,即上层协议,而切换协议接口的作用就是更改协议对应的上下文以及各种回调函数 (通用容器 Any)。
- 由于对连接的所有操作都是通过 Connection 模块来完成的,因此可能出现对连接进行某种操作的时候,Connection 对象已经被释放的场景,从而造成内存访问错误,导致程序崩溃 (虽然其他线程中对连接的所有操作都会被放入任务队列中,最后在连接对应的 EventLoop 关联的线程中去执行,但是任务队列中任务的执行也存在先后顺序);因此我们使用 shared_ptr 对 Connection 对象进行管理,然后在任意一个地方对 Connection 对象进行操作的时候都保存一份 shared_ptr,这样就算其他地方进行了释放操作,也只是将 shared_ptr 的计数器 -1,而不会导致 Connection 的实际释放。
Connection 类的代码实现如下:
/*通信连接的状态*/
typedef enum {
DISCONNECTED, // 连接已断开状态
CONNECTING, // 连接建立成功,但连接的各种状态待设置(Channel事件回调、处理上下文等)
CONNECTED, // 连接建立完成状态
DISCONNECTING,// 连接半关闭状态 -- 接收或发送缓冲区中有数据待处理
} ConnStatu;
/*通信连接管理类*/
class Connection : public std::enable_shared_from_this<Connection> {
// 通信连接的智能指针 -- 最开始的智能指针(引用计数为1)由服务器模块创建
using ConnectionPtr = std::shared_ptr<Connection>;
// 组件使用者设置给服务器模块,服务器模块获取新的通信连接后再设置给通信连接模块的回调函数类型
using ConnectedFunc = std::function<void(const ConnectionPtr&)>;
using MessageFunc = std::function<void(const ConnectionPtr&, Buffer *)>;
using CloseFunc = std::function<void(const ConnectionPtr&)>;
using EventFunc = std::function<void(const ConnectionPtr&)>;
private:
/*Channel模块各种事件的回调函数*/
/*设置描述符可读事件回调 -- 真正的数据接收接口*/
void HandleRead() {
// 接收socket中的数据
char buf[65536]; // 64KB
ssize_t n = _socket.Recv(&buf, sizeof(buf));
// 接收数据出错不能直接释放连接,因为输出缓冲区可能还有数据待发送,而是通过ShutDownInLooop()将连接设置为待关闭状态
if (n < 0) { return ShutDownInLoop(); }
// 将数据写入到输入缓冲区
_inbuffer.Write(buf, n);
// 调用_Message_cb()进行业务处理
// shared_from_this() -- 从当前对象自身获取自身的shared_ptr管理对象,使用时要求类继承enable_shared_from_this<class T>模板类
if (_inbuffer.ReadAbleSize() > 0)
if (_message_cb) _message_cb(shared_from_this(), &_inbuffer);
}
/*设置描述符可写事件回调 -- 真正的数据发送接口*/
void HandleWrite() {
// 发送发送缓冲区中的数据
ssize_t n = _socket.Send(_outbuffer.ReadStartAddr(), _outbuffer.ReadAbleSize());
if (n < 0) {
// 发送失败观察输入缓冲区中是否有数据待处理,有则处理
// 没有则将释放连接操作压入到任务队列中,等待全部就绪事件业务处理完毕后再释放连接
if (_inbuffer.ReadAbleSize() > 0)
if (_message_cb) _message_cb(shared_from_this(), &_inbuffer);
ReleaseToQueue();
}
// 移动输出缓冲区的数据读取位置
_outbuffer.MoveReadOffset(n);
// 如果输出缓冲区中没有数据待发送了,则关闭写事件监控
if (_outbuffer.ReadAbleSize() == 0) {
_channel.DisableWrite();
// 如果连接是待关闭状态且有数据待发送,此时数据已发送完毕,将释放连接操作压入任务队列
if (_statu == DISCONNECTING) return ReleaseToQueue();
}
}
/*设置描述符挂断事件事件回调*/
void HandleClose() {
// 描述符挂断,则套接字发送与接收数据都失效,此时只能处理一下inbuffer中可能存在的未处理数据
if (_inbuffer.ReadAbleSize() > 0)
if (_message_cb) _message_cb(shared_from_this(), &_inbuffer);
// 将释放连接操作压入到任务队列中,等待全部就绪事件业务处理完毕后再释放连接
ReleaseToQueue();
}
/*设置描述符错误事件回调*/
void HandleError() { return HandleClose(); }
/*设置描述符任意事件回调*/
void HandleEvent() {
// 刷新连接活跃度(延迟定时任务销毁时间)
if (_enable_inactive_release) _loop->ReFlushTimer(_conn_id);
// 执行组件使用者->服务器模块设置的任意事件回调
if (_event_cb) _event_cb(shared_from_this());
}
/*为保证线程安全,需要在EventLoop对应的线程中完成各种操作*/
/*连接建立成功后,进行各种状态设置*/
void EstablishedInLoop() {
// 修改连接状态
assert(_statu == CONNECTING);
_statu = CONNECTED;
// 启动读事件监控
_channel.EnableRead();
// 调用组件使用者->服务器模块设置的连接建立完成回调
if (_connected_cb) _connected_cb(shared_from_this());
}
/*实际的通信连接释放接口*/
void ReleaseInLoop() {
// 修改连接状态
_statu = DISCONNECTED;
// 移除描述符事件监控
_channel.Remove();
// 关闭描述符
_socket.Close();
// 如果有定时任务,则取消定时任务
if (_loop->HasTimer(_conn_id)) DisableInactiveReleaseInLoop();
// 调用组件使用者设置的以及服务器模块设置的连接关闭回调函数
if (_close_cb) _close_cb(shared_from_this());
if (_server_close_cb) _server_close_cb(shared_from_this());
}
/*发送数据 -- 不是真正的发生数据接口,只是将数据放到发送缓冲区,然后启动可写事件监控*/
/*真正的数据发生是触发可写事件后在Channel可写事件回调HandleWrite中进行的*/
void SendInLoop(const char data[], size_t len) {
if (_statu == DISCONNECTED) return;
// 将数据放到发送缓冲区中
_outbuffer.Write(data, len);
// 启动写事件监控
if (_channel.Writeable() == false) _channel.EnableWrite();
}
/*将释放连接操作压入到任务队列中,等待全部就绪事件业务处理完毕后再释放连接,避免业务处理超时导致连接被释放*/
void ReleaseToQueue() { return _loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop, this)); }
/*关闭通信连接 -- 不是真正的连接释放接口,只是修改连接状态为DISCONNECTING*/
void ShutDownInLoop() {
// 修改连接为半关闭状态
_statu = DISCONNECTING;
// 判断是输入缓冲区中是否有数据待处理
if (_inbuffer.ReadAbleSize() > 0)
if (_message_cb) _message_cb(shared_from_this(), &_inbuffer);
// 判断输出缓冲区中是否有数据待发送
if (_outbuffer.ReadAbleSize() > 0)
if (_channel.Writeable() == false) _channel.EnableWrite();
// 如果输出缓冲区中无数据,则将释放连接操作压入到任务队列中,等待全部就绪事件的业务处理完毕后再释放连接
if (_outbuffer.ReadAbleSize() == 0) ReleaseToQueue();
}
/*启动非活跃连接定时销毁*/
void EnableInactiveReleaseInLoop(uint32_t sec) {
_enable_inactive_release = true;
// 定时任务存在则刷新,不存在则添加
if (_loop->HasTimer(_conn_id)) _loop->ReFlushTimer(_conn_id);
_loop->AddTimer(_conn_id, sec, std::bind(&Connection::ReleaseToQueue, this));
}
/*取消非活跃连接定时销毁*/
void DisableInactiveReleaseInLoop() {
_enable_inactive_release = false;
// 定时任务存在则取消
if (_loop->HasTimer(_conn_id)) _loop->CancelTimer(_conn_id);
}
/*切换协议 -- 重置上下文以及组件使用者的阶段性处理函数*/
void UpgradeInLoop(const Any &context, const ConnectedFunc &conn_cb, const MessageFunc &msg_cb,
const CloseFunc &close_cb, const EventFunc &event_cb) {
_context = context;
_connected_cb = conn_cb;
_message_cb = msg_cb;
_close_cb = close_cb;
_event_cb = event_cb;
}
public:
Connection(EventLoop *loop, uint64_t conn_id, int sockfd)
: _conn_id(conn_id), _sockfd(sockfd), _socket(_sockfd), _enable_inactive_release(false),
_statu(CONNECTING), _loop(loop), _channel(_loop, _sockfd) {
// Channel模块各种回调函数的设置
_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));
_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));
_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));
_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));
_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));
}
/*获取通信连接id*/
uint64_t Id() { return _conn_id; }
/*获取连接描述符*/
int Fd() { return _sockfd; }
/*判断通信连接是否已经建立完成,可以直接通信*/
bool IsConnected() { return _statu == CONNECTED; }
/*设置处理上下文*/
void SetContext(const Any &context) { _context = context; }
/*获取处理上下文*/
Any *GetContext() { return &_context; }
/*组件使用者设置给服务器模块,服务器模块获取新的通信连接后再设置给通信连接模块的回调函数*/
void SetConnectedCallback(const ConnectedFunc &connected_cb) { _connected_cb = connected_cb; }
void SetMessageCallback(const MessageFunc &mssgage_cb) { _message_cb = mssgage_cb; }
void SetCloseCallback(const CloseFunc &close_cb) { _close_cb = close_cb; }
void SetEventCallback(const EventFunc &event_cb) { _event_cb = event_cb; }
/*服务器模块设置的连接关闭回调函数*/
void SetServerCloseCallback(const CloseFunc &server_close_cb) { _server_close_cb = server_close_cb; }
/*连接建立成功后进行各种状态设置 -- 启动可读事件监控、_connected_cb()回调等*/
void Established() { _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this)); }
/*发送数据*/
void Send(const char data[], size_t len) { _loop->RunInLoop(std::bind(&Connection::SendInLoop, this, data, len)); }
/*关闭通信连接*/
void ShutDown() { _loop->RunInLoop(std::bind(&Connection::ShutDownInLoop, this)); }
/*启动非活跃连接定时销毁*/
void EnableInactiveRelease(uint32_t sec) { _loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec)); }
/*取消非活跃连接定时销毁*/
void DisableInactiveRelease() { _loop->RunInLoop(std::bind(&Connection::DisableInactiveReleaseInLoop, this)); }
/*切换协议 -- 更新上下文以及阶段性处理函数*/
void Upgrade(const Any &context, const ConnectedFunc &conn_cb, const MessageFunc &msg_cb,
const CloseFunc &close_cb, const EventFunc &event_cb)
{
// 注意:Upgrage接口必须在EventLoop对应的线程中立即被执行,即当前线程必须就是EventLoop对应的线程,
// 这样在RunInLoop函数中UpgradeInLoop函数才是直接被执行,而不是加入到任务队列中
// 否则会导致当UpgradeInLoop函数未被执行,但有新的事件触发时,这个新事件按照原来的协议进行处理
_loop->AssertInLoop();
_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn_cb, msg_cb, close_cb, event_cb));
}
private:
uint64_t _conn_id; // 唯一通信ID,同时也作为定时器ID使用
int _sockfd; // 连接描述符
Socket _socket; // 套接字模块
bool _enable_inactive_release; // 是否启动非活跃连接销毁标志
ConnStatu _statu; // 通信连接的状态
EventLoop *_loop; // 连接关联的EventLoop
Channel _channel; // 描述符监控事件管理模块
Buffer _inbuffer; // 用户输入缓冲区
Buffer _outbuffer; // 用户输出缓冲区
Any _context; // 请求接收处理上下文
/*下面的四个回调函数是由组件使用者设置给服务器模块,服务器模块获取新的通信连接后再设置给通信连接模块的*/
ConnectedFunc _connected_cb; // 通信连接建立成功的回调
MessageFunc _message_cb; // 接收到消息的回调
CloseFunc _close_cb; // 通信连接关闭的回调
EventFunc _event_cb; // 任意事件的回调
/*最后的这个回调函数是服务器模块的连接关闭回调,因为通信连接智能指针最开始会在服务器模块中创建*/
/*因此连接关闭时服务器模块的智能指针对象也需要释放,否则引用计数永远不可能为0,通信连接也就不可能真正被释放*/
CloseFunc _server_close_cb; // 服务器模块的连接关闭回调
};
8. Acceptor 模块
Acceptor 模块的设计思想如下:
具体实现时的一些细节如下:
- 由于 Acceptor 仅对监听套接字进行管理,所以它的设计流程很简单:
- 创建一个监听套接字用于监听客户端连接。
- 启动监听套接字的可读事件监控。
- 当可读事件触发后获取客户端新连接。
- 调用新连接获取成功后的回调函数,为新连接创建 Connection 对象进行管理。
- 需要注意的是,服务器监听到一个新的客户端连接后,应该为新连接创建 Connection 对象,但由于 Acceptor 模块只对监听套接字进行管理,所以获取到新的客户端连接后需要由服务器模块对其进行处理,比如为其创建 Connection 对象,设置各种回调函数,因此 Acceptor 模块中仅有一个服务器模块设置的获取到新连接后的回调函数。
Acceptor 类的代码实现如下:
/*监听套接字事件监控与处理类*/
class Acceptor {
using AcceptCallback = std::function<void(int)>;
private:
/*创建一个服务器连接并返回监听套接字*/
int CreateServer(uint16_t port, const std::string &ip, bool non_block) {
bool ret = _socket.CreateServer(port, ip, non_block);
if (ret == false) {
// 监听套接字创建失败,直接终止程序
LOG(FATAL, "CreateServer failed");
abort();
}
return _socket.Fd();
}
/*监听套接字的可读事件回调*/
void HandleRead() {
// 获取新连接
int newfd = _socket.Accept();
// 对新连接进行处理 -- 创建Connection对象等
if (newfd < 0) return;
if (_accept_cb) _accept_cb(newfd);
}
public:
Acceptor(EventLoop *loop, uint16_t port, const std::string &ip = "0.0.0.0", bool non_block = true)
: _socket(CreateServer(port, ip, non_block)), _loop(loop), _channel(loop, _socket.Fd()) {
// 设置监听套接字可读事件回调
_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));
// 注意,启动读事件监控不能放在构造函数中,因为此时通信连接回调函数还未设置,可能会出现获取了新连接但得不到处理的情况
}
/*设置通信连接处理回调函数 -- 为获取到的新连接创建Connetion对象*/
void SetAcceptCallback(const AcceptCallback &accept_cb) { _accept_cb = accept_cb; }
/*启动读事件监控*/
void EnableRead() { _channel.EnableRead(); }
private:
Socket _socket; // 套接字模块
EventLoop *_loop; // 描述符事件监控与处理模块
Channel _channel; // 描述符监控事件管理模块
AcceptCallback _accept_cb; // 对新的通信连接进行处理的回调函数
};
9. LoopThread 模块
我们上面在设计 EventLoop 模块时提到 EventLoop 模块与线程是一一对应的,并且由于 EventLoop 模块在构造时就会使用当前线程 id 来作为 EventLoop 对象所关联的线程的 id – _thread_id(std::this_thread::get_id());同时,我们后面在运行一个操作的时候判断当前是否运行在 EventLoop 模块对应的线程中,就是将线程 ID 与 EventLoop 模块中的 _thread id 进行比较,相同表示在同一个线程,不同表示当前运行线程并不是 EventLoop 线程。
因此,EventLoop 模块必须在线程内部实例化,即先为 EventLoop 对象创建一个线程,然后在该线程的入口函数中去实例化 EventLoop 对象,这样该线程就会与 EventLoop 对象相关联 (实例化时该线程 id 被用于初始化 EventLoop 对象的 _thread_id)。
需要注意的是,我们不能事先创建多个 EventLoop 对象,然后创建多个线程,最后将各个线程的 id 重新赋值给 EventLoop 进行关联,因为这样在构造 EventLoop 对象到设置新的 _thread_id 期间,EventLoop 产生的操作将是不可控的。
基于以上思想,我们需要构建一个 LoopThread 模块,这个模块的功能是将 EventLoop 与 thread 整合到一起,向外部返回所实例化的 EventLoop 对象。
EventLoop 类的代码实现如下:
/*为每个通信连接创建从线程,在从线程中实例化连接对应的EventLoop对象,完成描述符事件监控与处理*/
class LoopThread {
/*线程入口函数*/
void ThreadEntry() {
// 创建EventLoop对象 -- 这里创建局部对象即可,不必new,因为_loop.start是死循环运行的
EventLoop loop;
{
std::unique_lock<std::mutex> lock(_mutex);
_loop = &loop;
// 唤醒可能正在阻塞的GetLoop操作
_cond.notify_all();
}
// 运行EventLoop模块 -- 获取并处理活跃连接,执行任务
_loop->Start();
}
public:
LoopThread() : _loop(nullptr), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}
/*获取EventLoop对象指针*/
EventLoop *GetLoop() {
EventLoop *loop;
{
std::unique_lock<std::mutex> lock(_mutex);
// 如果_loop为空说明EventLoop对象还未实例化,则阻塞等待条件成立,即_loop不为空
_cond.wait(lock, [&](){ return _loop != nullptr; });
loop = _loop;
}
return loop;
}
private:
// 互斥锁和条件变量用于实现获取_loop指针的同步关系,避免线程创建了但EventLoop还未实例化时获取_loop指针
std::mutex _mutex; // 互斥锁
std::condition_variable _cond; // 条件变量
EventLoop *_loop; // EventLoop指针变量,需要在线程内部进行实例化
std::thread _thread; // EventLoop对应的线程
};
10. LoopThreadPool 模块
我们上面针对 EventLoop 设计 LoopThread 模块,由于客户端连接有多个,而每一个客户端连接都对应一个 Connection 模块、EventLoop 模块以及 LoopThread 模块,因此我们需要针对 LoopThread 设计一个线程池 – LoopThreadPool,用于对所有的 LoopThread 进行管理及分配。
LoopThreadPool 模块所要完成的功能如下:
-
线程数量可配置 (0个或多个)。
需要注意的是,在服务器中,由于主从 Reactor 模型是主线程只负责新连接获取,从属线程负责新连接的事件监控及处理,因此当前的线程池中从属线程的数量有可能会为0,也就是实现单 Reactor 服务器,仅有一个线程,其即负责获取连接,也负责连接的处理。
-
对所有的线程进行管理 – 管理0个或多个 LoopThread 对象.
-
提供线程分配的功能 – 当主线程获取了一个新连接时,将新连接挂到从属线程上进行事件监控及处理。
假设有0个从属线程,则直接分配给主线程的 EventLoop 进行处理;假设有多个从属线程,则采用 RR 轮转思想,进行线程的分配 (将被选择线程的 EventLoop 对象获取到,然后设置给对应的 Connection 对象)
LoopThreadPool 类的代码实现如下:
/*LoopThread线程池,用于管理及分配所有的LoopThread*/
class LoopThreadPool {
public:
LoopThreadPool(EventLoop *main_loop) : _thread_count(0), _loop_idx(0), _main_loop(main_loop) {}
/*设置从线程数量*/
void SetThreadCount(uint32_t count) {
_thread_count = count;
_threads.resize(_thread_count);
_loops.resize(_thread_count);
}
/*创建所有的从属LoopThread线程*/
void CreateThreads() {
if (_thread_count == 0) return;
for (size_t i = 0; i < _thread_count; i++) {
_threads[i] = new LoopThread();
_loops[i] = _threads[i]->GetLoop();
}
}
/*为通信连接分配EventLoop*/
EventLoop *GetLoop() {
// 如果_thread_count为0,即单Reactor模型,则全部连接的全部事件都在主EventLoop中完成
if (_thread_count == 0) return _main_loop;
// 如果是主从Reactor模型,则为每个连接分配一个EventLoop,连接的所有事件都在对应的EventLoop中完成
EventLoop *loop = _loops[_loop_idx];
_loop_idx = (_loop_idx + 1) % _thread_count;
return loop;
}
private:
uint32_t _thread_count; // 从线程数量
int _loop_idx; // 当前未分配的EventLoop的下标
EventLoop *_main_loop; // 主EventLoop(主Reactor主线程),当从线程数量为0时连接的所有操作都在主线程中执行
std::vector<LoopThread*> _threads; // 保存所有的从LoopThread
std::vector<EventLoop*> _loops; // 保存所有的EventLoop,当从线程数量大于0时从_threads中为EventLoop分配线程
};
11. TcpServer 模块
TcpServer 模块的设计思想如下:
TCP 服务器的运行流程如下:
- 在 TcpServer 中实例化一个 Acceptor 对象,以及一个 EventLoop 对象 (main_loop)。
- 将 Acceptor 挂到 main_loop 上进行事件监控。
- 一旦 Acceptor 对象就绪了可读事件,则执行读事件回调函数获取新建连接。
- 为新连接创建一个 Connection 进行管理。
- 对连接对应的 Connection 设置各种功能回调 (连接完成回调,消息回调,关闭回调,任意事件回调)。
- 启动 Connection 的非活跃连接超时销毁功能。
- 将新连接对应的 Connection 挂到 LoopThreadPool 中的从属线程对应的 Eventloop 中进行事件监控。
- 一旦 Connection 对应的连接就绪了事件,则执行事件对应的回调函数进行处理,处理完毕后调用 TcpServer 对应的回调函数。
TcpServer 类的代码实现如下:
/*整合封装服务器模块*/
class TcpServer {
// 通信连接的智能指针 -- 最开始的智能指针(引用计数为1)
using ConnectionPtr = std::shared_ptr<Connection>;
// 组件使用者设置给服务器模块的各种事件回调
using ConnectedFunc = std::function<void(const ConnectionPtr&)>;
using MessageFunc = std::function<void(const ConnectionPtr&, Buffer *)>;
using CloseFunc = std::function<void(const ConnectionPtr&)>;
using EventFunc = std::function<void(const ConnectionPtr&)>;
// 任务类型
using TaskFunc = std::function<void()>;
private:
/*在EventLoop对应的线程中添加定时任务*/
void AddTimerInLoop(const TaskFunc &cb, uint32_t timeout) {
_main_loop.AddTimer(_conn_id, timeout, cb);
++_conn_id;
}
/*为获取到的新连接创建Connection对象并管理其shared_ptr*/
void CreateConnection(int fd) {
ConnectionPtr conn(new Connection(_pool.GetLoop(), _conn_id, fd));
// 为通信连接设置各种用户层的事件回调
conn->SetConnectedCallback(_connected_cb);
conn->SetMessageCallback(_message_cb);
conn->SetCloseCallback(_close_cb);
conn->SetEventCallback(_event_cb);
// 设置服务器模块的连接关闭回调 -- 释放Connection对象
conn->SetServerCloseCallback(std::bind(&TcpServer::RemoveConncetion, this, std::placeholders::_1));
// 启动连接非活跃超时销毁
if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout);
// 完成连接建立成功后的各种初识化工作
conn->Established();
// 管理通信连接
_conns[_conn_id] = conn;
++_conn_id;
}
/*在EventLoop对应的线程中移除_conns中保存的Connection对象的shared_ptr*/
void RemoveConncetionInLoop(const ConnectionPtr &conn) {
uint64_t conn_id = conn->Id();
auto it = _conns.find(conn_id);
if (it != _conns.end()) _conns.erase(it);
}
/*通信连接关闭时移除_conns中保存的最后一份Connection对象的shared_ptr,从而释放掉连接对应的Connection对象*/
void RemoveConncetion(const ConnectionPtr &conn) {
return _main_loop.RunInLoop(std::bind(&TcpServer::RemoveConncetionInLoop, this, conn));
}
public:
TcpServer(uint16_t port, const std::string &ip = "0.0.0.0", bool non_block = true)
: _conn_id(0), _port(port), _enable_inactive_release(false),
_acceptor(&_main_loop, port, ip, non_block), _pool(&_main_loop) {
// 设置通信连接回调函数 -- 为通信连接创建Connection对象
_acceptor.SetAcceptCallback(std::bind(&TcpServer::CreateConnection, this, std::placeholders::_1));
}
/*设置从属线程池线程数量*/
void SetThreadCount(uint32_t count) { _pool.SetThreadCount(count); }
/*启动非活跃连接定时销毁任务*/
void EnableInactiveRelease(uint32_t timeout) {
_timeout = timeout;
_enable_inactive_release = true;
}
/*取消非活跃连接定时销毁*/
void DisableInactiveRelease() { _enable_inactive_release = false; }
/*添加定时任务*/
void AddTimer(const TaskFunc &cb, uint32_t timeout) {
return _main_loop.RunInLoop(std::bind(&TcpServer::AddTimerInLoop, this, cb, timeout));
}
/*启动服务器*/
void Start() {
// 创建从属EventLoop线程池
_pool.CreateThreads();
// 启动监听套接字的读事件监控(并不是开始监听,只是添加EPOLLIN事件)
_acceptor.EnableRead();
// 开始监听并获取活跃连接、处理活跃连接的就绪事件以及执行任务
_main_loop.Start();
}
/*组件使用者设置给服务器模块的各种事件回调*/
void SetConnectedCallback(const ConnectedFunc &connected_cb) { _connected_cb = connected_cb; }
void SetMessageCallback(const MessageFunc &mssgage_cb) { _message_cb = mssgage_cb; }
void SetCloseCallback(const CloseFunc &close_cb) { _close_cb = close_cb; }
void SetEventCallback(const EventFunc &event_cb) { _event_cb = event_cb; }
private:
uint64_t _conn_id; // 自增通信连接ID
uint16_t _port; // 监听端口
uint32_t _timeout; // 定时任务的超时时间
bool _enable_inactive_release; // 是否启动非活跃连接定时销毁任务的标志
EventLoop _main_loop; // 主EventLoop -- 主Reactor主线程,实现对监听套接字的事件监控
Acceptor _acceptor; // 监听套接字管理模块
LoopThreadPool _pool; // 从属EventLoop线程池 -- 从Reactor从线程,实现对通信连接的事件监控
std::unordered_map<uint64_t, ConnectionPtr> _conns; // 保存管理通信连接对象的shared_ptr指针
/*组件使用者设置给服务器模块的各种事件回调*/
ConnectedFunc _connected_cb; // 通信连接建立成功的回调
MessageFunc _message_cb; // 接收到消息的回调
CloseFunc _close_cb; // 通信连接关闭的回调
EventFunc _event_cb; // 任意事件的回调
};
12. TcpServer 简单测试
我们上面已经完成了 TcpServer 服务器的开发,现在我们使用它来搭建一个简单的 echo 回显服务器用于测试,观察程序的运行结果是否符合预期。同时,为了能够更好的观察程序的运行逻辑以及便于我们纠错,我们使用C语言的 strftime、fprintf 函数,__VA_ARGS __ 可变参数以及 __FILE__、__LINE__ 宏来实现一个日志宏函数。
日志宏:
/*日志等级*/
enum {
NORMAL,
DEBUG,
ERROR,
FATAL
};
/*将日志等级转化为字符串*/
const char* level_to_stirng(int level) {
switch (level)
{
case NORMAL:
return "NORMAL";
case DEBUG:
return "DEBUG";
case ERROR:
return "ERROR";
case FATAL:
return "FATAL";
default:
return nullptr;
}
}
/*日志宏函数*/
#define LOG(level, format, ...) do {\
const char* levelstr = level_to_stirng(level); /*日志等级*/\
time_t ts = time(NULL); /*时间戳*/\
struct tm *lt = localtime(&ts); /*格式化时间*/\
char timebuf[32] = { 0 };\
strftime(timebuf, sizeof(timebuf) - 1, "%y-%m-%d %H:%M:%S", lt); /*格式化时间到字符串*/\
fprintf(stdout, "[%s][%p][%s][%s:%d] " format "\n", levelstr, (void*)pthread_self(), timebuf, __FILE__, __LINE__, ##__VA_ARGS__); /*##解除必须传递可变参数的限制*/\
} while(0)
echoServer.hpp:
#ifndef __ECHOSERVER_HPP__
#define __ECHOSERVER_HPP__
#include "../server.hpp"
/*使用TcpServer服务器组件搭建一个回显服务器demo*/
class EchoServer {
using ConnectionPtr = std::shared_ptr<Connection>;
private:
void OnConnected(const ConnectionPtr &conn) {
LOG(DEBUG, "Connection Created Success: %p", conn.get());
}
void OnClose(const ConnectionPtr &conn) {
LOG(DEBUG, "Connection Released Success: %p", conn.get());
}
void OnMessage(const ConnectionPtr &conn, Buffer *buf) {
// 读取输入缓冲区中的数据
size_t size = buf->ReadableSzie();
char data[size];
buf->Read(data, size);
// 业务处理,构建响应 -- 这里简单的回显即可
// 回复响应 -- 将响应写入到输出缓冲区
conn->Send(data, size);
}
public:
EchoServer(uint16_t port) : _port(port), _server(_port) {
_server.SetThreadCount(2);
_server.EnableInactiveRelease(10);
_server.SetConnectedCallback(std::bind(&EchoServer::OnConnected, this, std::placeholders::_1));
_server.SetCloseCallback(std::bind(&EchoServer::OnClose, this, std::placeholders::_1));
_server.SetMessageCallback(std::bind(&EchoServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));
}
void Start() { _server.Start(); }
private:
uint16_t _port;
TcpServer _server;
};
#endif
tcpClient.cc:
#include "../source/server.hpp"
int main()
{
// 创建客户端连接
Socket clientSock;
clientSock.CreateClient(8082, "127.0.0.1");
for (int i = 0; i < 5; i++) {
// 向服务器发送数据
std::string msg = "hello server";
clientSock.Send(msg.c_str(), msg.size());
// 接收服务器响应
char buffer[1024];
ssize_t n = clientSock.Recv(buffer, sizeof(buffer) - 1);
buffer[n] = 0;
LOG(DEBUG, "[echo]# %s", buffer);
sleep(1);
}
while(1) sleep(1);
clientSock.Close();
return 0;
}
main.cc:
#include "echoServer.hpp"
int main()
{
EchoServer server(8082);
server.Start();
}
测试结果如下:
下面我们使用 WebBench 对服务器 (2核2G) 进行简单的性能测试 (注:由于云服务器的带宽非常小,所以这里我们选择使用本主机来进行测试,即忽略带宽影响,但这种测试严格来说是不合理的,降低了服务器效率):
五、HTTP 协议模块开发
1. Util 模块
Util 模块的设计思想如下:
Util 类的代码实现如下:
/*HTTP响应状态码对应的描述信息*/
std::unordered_map<int, std::string> g_statuMsg = {
{100, "Continue"},
{101, "Switching Protocol"},
{102, "Processing"},
{103, "Early Hints"},
{200, "OK"},
{201, "Created"},
{202, "Accepted"},
{203, "Non-Authoritative Information"},
{204, "No Content"},
{205, "Reset Content"},
{206, "Partial Content"},
{207, "Multi-Status"},
{208, "Already Reported"},
{226, "IM Used"},
{300, "Multiple Choice"},
{301, "Moved Permanently"},
{302, "Found"},
{303, "See Other"},
{304, "Not Modified"},
{305, "Use Proxy"},
{306, "unused"},
{307, "Temporary Redirect"},
{308, "Permanent Redirect"},
{400, "Bad Request"},
{401, "Unauthorized"},
{402, "Payment Required"},
{403, "Forbidden"},
{404, "Not Found"},
{405, "Method Not Allowed"},
{406, "Not Acceptable"},
{407, "Proxy Authentication Required"},
{408, "Request Timeout"},
{409, "Conflict"},
{410, "Gone"},
{411, "Length Required"},
{412, "Precondition Failed"},
{413, "Payload Too Large"},
{414, "URI Too Long"},
{415, "Unsupported Media Type"},
{416, "Range Not Satisfiable"},
{417, "Expectation Failed"},
{418, "I'm a teapot"},
{421, "Misdirected Request"},
{422, "Unprocessable Entity"},
{423, "Locked"},
{424, "Failed Dependency"},
{425, "Too Early"},
{426, "Upgrade Required"},
{428, "Precondition Required"},
{429, "Too Many Requests"},
{431, "Request Header Fields Too Large"},
{451, "Unavailable For Legal Reasons"},
{501, "Not Implemented"},
{502, "Bad Gateway"},
{503, "Service Unavailable"},
{504, "Gateway Timeout"},
{505, "HTTP Version Not Supported"},
{506, "Variant Also Negotiates"},
{507, "Insufficient Storage"},
{508, "Loop Detected"},
{510, "Not Extended"},
{511, "Network Authentication Required"}
};
/*文件拓展名对应的MIME名称*/
std::unordered_map<std::string, std::string> g_extMime = {
{".aac", "audio/aac"},
{".abw", "application/x-abiword"},
{".arc", "application/x-freearc"},
{".avi", "video/x-msvideo"},
{".azw", "application/vnd.amazon.ebook"},
{".bin", "application/octet-stream"},
{".bmp", "image/bmp"},
{".bz", "application/x-bzip"},
{".bz2", "application/x-bzip2"},
{".csh", "application/x-csh"},
{".css", "text/css"},
{".csv", "text/csv"},
{".doc", "application/msword"},
{".docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"},
{".eot", "application/vnd.ms-fontobject"},
{".epub", "application/epub+zip"},
{".gif", "image/gif"},
{".htm", "text/html"},
{".html", "text/html"},
{".ico", "image/vnd.microsoft.icon"},
{".ics", "text/calendar"},
{".jar", "application/java-archive"},
{".jpeg", "image/jpeg"},
{".jpg", "image/jpeg"},
{".js", "text/javascript"},
{".json", "application/json"},
{".jsonld", "application/ld+json"},
{".mid", "audio/midi"},
{".midi", "audio/x-midi"},
{".mjs", "text/javascript"},
{".mp3", "audio/mpeg"},
{".mpeg", "video/mpeg"},
{".mpkg", "application/vnd.apple.installer+xml"},
{".odp", "application/vnd.oasis.opendocument.presentation"},
{".ods", "application/vnd.oasis.opendocument.spreadsheet"},
{".odt", "application/vnd.oasis.opendocument.text"},
{".oga", "audio/ogg"},
{".ogv", "video/ogg"},
{".ogx", "application/ogg"},
{".otf", "font/otf"},
{".png", "image/png"},
{".pdf", "application/pdf"},
{".ppt", "application/vnd.ms-powerpoint"},
{".pptx", "application/vnd.openxmlformats-officedocument.presentationml.presentation"},
{".rar", "application/x-rar-compressed"},
{".rtf", "application/rtf"},
{".sh", "application/x-sh"},
{".svg", "image/svg+xml"},
{".swf", "application/x-shockwave-flash"},
{".tar", "application/x-tar"},
{".tif", "image/tiff"},
{".tiff", "image/tiff"},
{".ttf", "font/ttf"},
{".txt", "text/plain"},
{".vsd", "application/vnd.visio"},
{".wav", "audio/wav"},
{".weba", "audio/webm"},
{".webm", "video/webm"},
{".webp", "image/webp"},
{".woff", "font/woff"},
{".woff2", "font/woff2"},
{".xhtml", "application/xhtml+xml"},
{".xls", "application/vnd.ms-excel"},
{".xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"},
{".xml", "application/xml"},
{".xul", "application/vnd.mozilla.xul+xml"},
{".zip", "application/zip"},
{".3gp", "video/3gpp"},
{".3g2", "video/3gpp2"},
{".7z", "application/x-7z-compressed"}
};
/*实用工具类*/
class Util {
public:
/*读取文件内容*/
static bool ReadFile(const std::string &filename, std::string *data) {
// 以二进制形式打开文件
std::ifstream ifs(filename.c_str(), std::ios::binary);
if(ifs.is_open() == false) {
LOG(ERROR, "Open %s file failed", filename.c_str());
return false;
}
// 获取文件大小
size_t size;
ifs.seekg(0, std::ios::end); // 跳转读写位置到文件末尾
size = ifs.tellg(); // 获取当前读写位置相较于文件起始位置的偏移量,即文件大小
ifs.seekg(0, std::ios::beg); // 跳转回读写位置到文件起始位置
// 读取文件内容
data->resize(size);
ifs.read(&(*data)[0], size);
// 判断是否读取成功
if(ifs.good() == false) {
LOG(ERROR, "Read %s file content failed", filename.c_str());
ifs.close();
return false;
}
// 关闭文件
ifs.close();
return true;
}
/*向文件中写入数据*/
static bool WriteFile(const std::string &filename, const std::string &data) {
// 以二进制形式打开文件,并清空原文件内容
std::ofstream ofs(filename.c_str(), std::ios::binary | std::ios::trunc);
if(ofs.is_open() == false) {
LOG(ERROR, "Open %s file failed", filename.c_str());
return false;
}
// 向文件中写入数据
ofs.write(data.c_str(), data.size());
// 判断是否写入成功
if(ofs.good() == false) {
LOG(ERROR, "Read %s file content failed", filename.c_str());
ofs.close();
return false;
}
// 关闭文件
ofs.close();
return true;
}
/*URL编码 -- 避免资源请求路径或者查询字符串中的特殊字符与HTTP请求中的的特殊字符产生歧义*/
// RFC3986文档规定的编码格式:特殊字符转化为两个十六进制字符,并带上前缀%. 比如 C++ -> C%2B%2B
// RFC3986文档规定以下字符为绝对不编码字符,即除了它们以外的字符都需要编码:. - _ ~ 数字 字母
// 同时W3C标准规定:查询字符中的空格需要被编码为 + ,而不是两个十六进制字符和前缀%
static std::string UrlEncode(const std::string &url, bool convert_space_to_plus) {
std::string str;
for(auto ch : url) {
// 绝对不编码字符
if (ch == '.' || ch == '-' || ch == '_' || ch == '~' || isalnum(ch)) {
str += ch;
continue;
}
// 空格且空格编码标志为真
if (ch == ' ' && convert_space_to_plus) {
str += '+';
continue;
}
// 编码字符,或者特殊编码字符空格但空格编码标志为假 ch -> %HH
char tmp[4] = { 0 };
// %% -> % %02X -> %X 十六进制大写 0 填充0 2 占位两个字符 ==> %HH
snprintf(tmp, 4, "%%%02X", ch);
str += tmp;
}
return str;
}
/*UrlDecode子函数 -- 将一个十六进制字符转换为一个十进制字符*/
static int HexToDec(char ch) {
if (ch >= '0' && ch <= '9') return ch - '0';
if (ch >= 'a' && ch <= 'z') return ch - 'a' + 10;
if (ch >= 'A' && ch <= 'Z') return ch - 'A' + 10;
return -1;
}
/*URL解码*/
static std::string UrlDecode(const std::string &url, bool convert_plus_to_space) {
// 遇到 % 就将其后面的两个十六进制字符转换为一个十进制字符即可
// 如果设置了空格字符的特殊编码,则将 + 替换为空格
std::string str;
for(int i = 0; i < url.size(); i++) {
if (url[i] == '%' && (i + 2) < url.size()) {
int ch1 = HexToDec(url[i + 1]);
int ch2 = HexToDec(url[i + 2]);
char ch = ch1 * 16 + ch2;
str += ch;
i += 2;
continue;
}
if (url[i] == '+' && convert_plus_to_space) {
str += ' ';
continue;
}
str += url[i];
}
return str;
}
/*获取响应状态码的描述信息*/
static std::string StatuDesc(int statu) {
auto it = g_statuMsg.find(statu);
if (it == g_statuMsg.end()) return "Unknown";
return it->second;
}
/*根据文件扩展名获取文件mime*/
static std::string ExtMime(const std::string &filename) {
// 获取文件扩展名
size_t pos = filename.rfind(".");
// 如果文件没有后缀,则返回二进制流,表示二进制文件
if (pos == std::string::npos) return "application/actet-stream";
std::string suffix = filename.substr(pos);
auto it = g_extMime.find(suffix);
if (it == g_extMime.end()) return "application/actet-stream";
return it->second;
}
/*判断一个文件是否是目录*/
static bool IsDirectory(const std::string &filename) {
struct stat st;
int n = stat(filename.c_str(), &st);
if (n < 0) return false;
// S_ISDIR宏函数:S_ISDIR(m) (((m) & S_IFMT) == S_IFDIR)
return S_ISDIR(st.st_mode);
}
/*判断一个文件是否是普通文件*/
static bool IsRegularFile(const std::string &filename) {
struct stat st;
int n = stat(filename.c_str(), &st);
if (n < 0) return false;
// S_ISREG宏函数:S_ISREG(m) (((m) & S_IFMT) == S_IFREG)
return S_ISREG(st.st_mode);
}
/*字符串分割*/
static size_t Split(const std::string &src, const std::string &sep, std::vector<std::string> *arr) {
arr->clear();
int offset = 0;
// ;abc;;def;efg;
while(offset < src.size()) {
size_t pos = src.find(sep, offset);
if (pos == std::string::npos) {
arr->push_back(src.substr(offset));
return arr->size();
}
// 当子串为空时跳过
if (pos == offset) {
offset = pos + sep.size();
continue;
}
arr->push_back(src.substr(offset, pos - offset));
offset = pos + sep.size();
}
return arr->size();
}
/*判断HTTP请求资源路径是否有效*/
static bool IsValidPath(const std::string &path) {
// 资源路径的有效性是指客户端只能访问web根目录下的文件,不能通过..回退访问根目录的上级目录
// 以/为分隔符将路径分隔开来
std::vector<std::string> arr;
Util::Split(path, "/", &arr);
int level = 0;
for(const auto& s : arr) {
// 遇到..回退上级目录操作目录深度减一
if (s == "..") {
--level;
// 在任意位置如果目录深度小于0,说明访问到了web根目录的上级目录,直接返回false
if (level < 0) return false;
continue;
}
// 遇到正常路径则目录深度加一
++level;
}
return true;
}
};
2. HttpRequest 模块
HttpRequest 模块的设计思想如下:
HttpRequest 类的代码实现如下:
/*存储HTTP请求的相关要素信息*/
class HttpRequest {
public:
HttpRequest() : _version("HTTP/1.1") {}
/*添加头部字段*/
void SetHeader(const std::string &key, const std::string &value) { _headers[key] = value; }
/*查询是否存在指定头部字段*/
bool HasHeader(const std::string &key) const {
auto it = _headers.find(key);
if (it == _headers.end()) return false;
return true;
}
/*获取指定头部字段的值*/
std::string GetHearder(const std::string &key) const {
auto it = _headers.find(key);
if (it == _headers.end()) return "";
return it->second;
}
/*插入查询字符串*/
void SetParam(const std::string &key, const std::string &value) { _params[key] = value; }
/*查询是否存在指定查询字符串*/
bool HasParam(const std::string &key) const {
auto it = _params.find(key);
if (it == _params.end()) return false;
return true;
}
/*获取指定查询字符串的值*/
std::string GetParam(const std::string &key) const {
auto it = _params.find(key);
if (it == _params.end()) return "";
return it->second;
}
/*获取正文长度*/
size_t ContentLength() const {
if (HasHeader("Content-Length") == false) return 0;
return std::stoi(GetHearder("Content-Length"));
}
/*判断是否是短连接通信*/
bool IsClose() const {
// Connection字段存在并且为keep-alive则是长连接通信
if ((HasHeader("Connection") == true) && (GetHearder("Connection") == "keep-alive")) return false;
return true;
}
/*重置请求内容*/
void Reset() {
_method.clear();
_path.clear();
_version = "HTTP/1.1";
_body.clear();
_params.clear();
_headers.clear();
std::smatch match;
_matches.swap(match);
}
public:
// 由于外部需要直接对成员变量进行设置和读取,所以这里使用public修饰
std::string _method; // 请求方法
std::string _path; // 资源路径
std::string _version; // 协议版本
std::string _body; // 请求正文
std::smatch _matches; // 保存资源路径正则匹配的结果
std::unordered_map<std::string, std::string> _params; // 查询字符串
std::unordered_map<std::string, std::string> _headers; // 请求头部字段
};
3. HttpResponse 模块
HttpResponse 模块的设计思想如下:
HttpResponse 类的代码实现如下:
/*保存HTTP响应的相关要素信息*/
class HttpResponse {
public:
HttpResponse() : _statu(200), _redirect_flag(false) {}
HttpResponse(int statu) : _statu(statu), _redirect_flag(false) {}
/*添加头部字段*/
void SetHeader(const std::string &key, const std::string &value) { _headers[key] = value; }
/*查询是否存在指定头部字段*/
bool HasHeader(const std::string &key) const {
auto it = _headers.find(key);
if (it == _headers.end()) return false;
return true;
}
/*获取指定头部字段的值*/
std::string GetHearder(const std::string &key) const {
auto it = _headers.find(key);
if (it == _headers.end()) return "";
return it->second;
}
/*设置响应正文及其相关的头部字段*/
void SetContent(const std::string &body, const std::string &type) {
_body = body;
SetHeader("Content-Type", type);
SetHeader("Content-Length", std::to_string(body.size()));
}
/*设置重定向以及Location头部字段*/
void SetRedirect(const std::string &url, int statu = 302) {
_statu = statu;
_redirect_flag = true;
_redirect_url = url;
SetHeader("Location", _redirect_url);
}
/*判断是否是短连接通信*/
bool IsClose() const {
// Connection字段存在并且为keep-alive则是长连接通信
if ((HasHeader("Connection") == true) && (GetHearder("Connection") == "keep-alive")) return false;
return true;
}
/*重置响应内容*/
void Reset() {
_statu = 200;
_body.clear();
_redirect_flag = false;
_redirect_url.clear();
_headers.clear();
}
public:
// 为了便于外界访问,这里我们同样将成员变量设置为公有
int _statu; // 响应状态码
std::string _body; // 响应正文
bool _redirect_flag; // 重定向标志
std::string _redirect_url; // 重定向路径
std::unordered_map<std::string, std::string> _headers; // 响应头部字段
};
4. HttpContext 模块
HttpContext 模块的设计思想如下:
HttpContext 类的代码实现如下:
/*HTTP请求行的最大长度 -- 8KB*/
#define MAX_LINE 8192
/*HTTP请求接收不同阶段的状态*/
typedef enum {
RECV_HTTP_ERROR, // 接收HTTP请求出错
RECV_HTTP_LINE, // 接收HTTP请求行
RECV_HTTP_HEAD, // 接收HTTP请求头部
RECV_HTTP_BODY, // 接收HTTP请求正文
RECV_HTTP_OVER, // HTTP请求接收完毕
} HttpRecvStatu;
/*HTTP请求接收与解析上下文模块 -- 记录HTTP请求接收与解析的进度*/
class HttpContext {
private:
/*解析HTTP请求行*/
bool ParseHttpLine(const std::string &line) {
// 原始请求行:GET /login?user=zhangsan&pass=12 3456 HTTP/1.1\r\n
// 编码后的请求行:GET %2Flogin?user%3Dzhangsan%26pass%3D12+3456 HTTP/1.1
// 使用正则表达式进行请求行的正则匹配
// std::regex::icase:匹配时忽略大小写
std::smatch matches;
std::regex e("(GET|POST|PUT|DELETE|HEAD) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?", std::regex::icase);
bool ret = std::regex_match(line, matches, e);
if (ret == false) {
// 解析失败说明请求格式错误
_recv_statu = RECV_HTTP_ERROR;
// Bad Request
_resp_statu = 400;
return false;
}
// matches中保存的正则匹配后的请求要素信息如下(没有开头的num:):
// 0:GET %2Flogin?user%3Dzhangsan%26pass%3D12+3456 HTTP/1.1\r\n
// 1:GET
// 2:%2Flogin
// 3:user%3Dzhangsan%26pass%3D12+3456
// 4:HTTP/1.1
// 填充请求方法(注意:这里需要将可能的小写请求方法转换为大写)
std::string method = matches[1];
for(int i = 0; i < method.size(); i++) {
method[i] = toupper(method[i]);
}
_request._method = method;
// 填充资源路径 -- 需要进行解码
_request._path = Util::UrlDecode(matches[2], false);
// 填充协议版本
_request._version = matches[4];
// 获取查询字符串 -- 需要进行解码,同时需要将+特殊解码为空格
std::string queryStr = Util::UrlDecode(matches[3], true);
// 分割查询字符串 -- 首先以&为分隔符得到多个查询键值对,然后以=为分隔符得到键值对中的key与value,最终填充到请求信息的_params中
std::vector<std::string> queryStr_arr;
Util::Split(queryStr, "&", &queryStr_arr);
for(auto &kv : queryStr_arr) {
size_t pos = kv.find("=");
// 如果键值对中没有=号分割,说明请求行格式错误
if (pos == std::string::npos) {
_recv_statu = RECV_HTTP_ERROR;
// Bad Request
_resp_statu = 400;
return false;
}
// 填充查询字符串
std::string key = kv.substr(0, pos);
std::string value = kv.substr(pos + 1);
_request.SetParam(key, value);
}
return true;
}
/*接收HTTP请求行*/
bool RecvHttpLine(Buffer *buf) {
if (_recv_statu != RECV_HTTP_LINE) return false;
std::string line = buf->GetLine();
// 如果line长度为0,说明获取一行数据失败
if (line.size() == 0) {
// 如果inbuffer中可读数据长度请求行的最大长度,则说明inbuffer中的请求行数据有问题
if (buf->ReadAbleSize() > MAX_LINE) {
_recv_statu = RECV_HTTP_ERROR;
// URI Too Long
_resp_statu = 414;
return false;
}
// 否则说明inbuffer中数据太少,不足一个完整的请求行,等待新数据到来
return true;
}
// 否则说明成功接收到了请求行
// 如果请求行的数据长度大规定的请求行的最大长度,则也是有问题的
if (line.size() > MAX_LINE) {
_recv_statu = RECV_HTTP_ERROR;
// URI Too Long
_resp_statu = 414;
return false;
}
// 走到这里,说明接收到了一个合法的请求行,调用ParseHttpLine进行解析
bool ret = ParseHttpLine(line);
if (ret == false) return false;
// 如果HTTP请求行解析成功,则将接收状态修改为RECV_HTTP_HEAD
_recv_statu = RECV_HTTP_HEAD;
return true;
}
/*解析HTTP请求头部*/
bool ParseHttpHead(const std::string &line) {
// key: value\r\n
size_t pos1 = line.find(": ");
if (pos1 == std::string::npos) {
_recv_statu = RECV_HTTP_ERROR;
// Bad Request
_resp_statu = 400;
return false;
}
// 找到末尾的\r\n -- 注意有的请求末尾可能只有\n,没有\r
size_t pos2 = line.find('\r');
if (pos2 == std::string::npos) pos2 = line.find('\n');
std::string key = line.substr(0, pos1);
std::string value = line.substr(pos1 + strlen(": "), pos2 - (pos1 + strlen(": ")));
_request.SetHeader(key, value);
return true;
}
/*接收HTTP请求头部*/
bool RecvHttpHead(Buffer *buf) {
if (_recv_statu != RECV_HTTP_HEAD) return false;
// HTTP请求头部格式:key: value\r\nkey: value\r\n...
// 所以逐行取出缓冲区中的数据进行解析,直到遇到空行即可
while(true) {
std::string line = buf->GetLine();
if (line.size() == 0) {
if (buf->ReadAbleSize() > MAX_LINE) {
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414;
return false;
}
return true;
}
if (line.size() > MAX_LINE) {
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414;
return false;
}
// 如果读取到空行,说明请求头部读取完毕
if (line == "\n" || line == "\r\n") break;
// 调用ParseHttpHead进行解析
bool ret = ParseHttpHead(line);
if (ret == false) return false;
}
// 如果全部请求头部字段都解析成功,则将接收状态修改为RECV_HTTP_BODY
_recv_statu = RECV_HTTP_BODY;
return true;
}
/*接收HTTP请求正文*/
bool RecvHttpBody(Buffer *buf) {
if (_recv_statu != RECV_HTTP_BODY) return false;
// 获取正文长度
size_t total_length = _request.ContentLength();
// 获取已接收的正文长度(_request._body中已有数据的长度)
size_t received_length = _request._body.size();
// 当前仍需接收的正文长度
size_t unreceived_length = total_length - received_length;
// 检查缓冲区中的数据长度,如果大于等于所需数据长度则请求正文接收解析成功,修改接收状态
if (buf->ReadAbleSize() >= unreceived_length) {
std::string body = buf->Read(unreceived_length);
_request._body += body;
_recv_statu = RECV_HTTP_OVER;
return true;
}
// 如果不足所需数据长度,则读取缓冲区中已有的数据,但不修改接收状态,等待新数据到来
std::string body = buf->Read(buf->ReadAbleSize());
_request._body += body;
return true;
}
public:
HttpContext() : _resp_statu(200), _recv_statu(RECV_HTTP_LINE) {}
/*获取HTTP响应状态码*/
int GetRespStatu() { return _resp_statu; }
/*获取HTTP请求接收阶段状态码*/
HttpRecvStatu GetHttpRecvStatu() { return _recv_statu; }
/*接收并解析HTTP请求*/
void RecvAndParse(Buffer *buf) {
switch(_recv_statu) {
// 注意:case后面不需要加break,因为请求行接收解析完毕后,接下来本来就是要接收解析请求头部的,头部后面又是正文
case RECV_HTTP_LINE: RecvHttpLine(buf);
case RECV_HTTP_HEAD: RecvHttpHead(buf);
case RECV_HTTP_BODY: RecvHttpBody(buf);
}
}
/*获取保存请求接收与解析得到的要素信息*/
HttpRequest GetRequest() { return _request; }
/*重置上下文*/
void Reset() {
_resp_statu = 200;
_recv_statu = RECV_HTTP_LINE;
_request.Reset();
}
private:
int _resp_statu; // 响应状态码
HttpRecvStatu _recv_statu; // 当前处于的HTTP请求接收阶段
HttpRequest _request; // HTTP请求接收并解析完毕后的请求要素信息
};
5. HttpServer 模块
HttpServer 模块是对于HTTP协议支持所有模块的整合,其目的是让HTTP服务器的搭建变得更加简便。HttpServer 模块的设计思路如下:
- 我们可以设计一张/多张请求路由表,表中记录了 HTTP 请求与其业务处理函数的映射关系,当服务器收到了一个请求,就在请求路由表中,查找有没有对应请求的处理函数,如果有,则执行对应的处理函数,没有,则返回405页面即可 (Method Not Allowed)。这样做的好处是用户只需要实现业务处理函数,然后将请求与处理函数的映射关系添加到服务器中;而服务器只需要接收数据,解析数据,查找路由表映射关系,最后执行业务处理函数。
HTTP 服务器的运行流程如下:
- 从 socket 中接收数据,放到接收缓冲区。
- 调用 OnMessage 回调函数进行业务处理。
- 对请求进行解析,得到了一个 HttpRequest 结构对象,其中包含了所有的请求要素信息。
- 进行请求的路由查找 – 找到请求对应的处理方法。
- 如果是静态资源请求,比如 html 页面,image 文件等,则将静态资源文件的数据读取出来,填充到 HttpResponse 结构中。
- 如果是功能性请求,则在请求路由映射表中查找处理函数,找到了则执行函数进行具体的业务处理,并进行 HttpResponse 结构的数据填充。
- 对静态资源请求/功能性请求进行处理完毕后,已经得到了一个填充了响应信息的 HttpResponse 对象,将其组织成为 HTTP 格式响应,发送给客户端即可。
HttpServer 类的代码实现如下:
/*整合封装HTTP服务器模块*/
class HttpServer {
using HandleFunc = std::function<void (const HttpRequest &, HttpResponse *)>;
using ConnectionPtr = std::shared_ptr<Connection>;
private:
/*错误处理函数*/
void ErrorHandler(HttpResponse *resp) {
// 组织一个错误页面
std::string body = "<html><head><meta http-equiv='Content-Type' content='text/html;charset=utf-8'>";
body += "</head><body><h1>";
body += std::to_string(resp->_statu) + " " + Util::StatuDesc(resp->_statu);
body += "</h1></body></html>";
// 将响应页面数据作为正文进行响应
resp->SetContent(body, Util::ExtMime(".html"));
}
/*判断是否是静态资源请求*/
bool IsFileRequest(HttpRequest &req) {
// 是否设置了静态资源根目录
if (_wwwroot.empty()) return false;
// 请求方法是否是GET或者HEAD
if (req._method != "GET" && req._method != "HEAD") return false;
// 请求资源路径是否合法
if (Util::IsValidPath(req._path) == false) return false;
// 请求的资源是否存在(注意:如果资源路径是目录,则在其后面默认追加一个index.html)
std::string path = _wwwroot + req._path; // 实际的资源路径
if (path.back() == '/') path += "index.html";
if (Util::IsRegularFile(path) == false) return false;
// 以上条件都满足,说明这是一个静态资源请求,则将相对资源路径替换为实际路径并返回true
req._path = path;
return true;
}
/*静态资源请求业务处理函数*/
void FileHandler(const HttpRequest &req, HttpResponse *resp) {
// 读取静态资源文件中的数据
std::string body;
Util::ReadFile(req._path, &body);
// 设置响应正文及相关头部字段
resp->SetContent(body, Util::ExtMime(req._path));
}
using Handlers = std::vector<std::pair<std::regex, HandleFunc>>;
/*功能性请求业务处理函数*/
void Dispatcher(HttpRequest &req, HttpResponse *resp, Handlers &route) {
// 由于路由表中存储的是正则表达式以及对应的业务处理函数
// 所以我们需要使用路由表中的正则表达式对HTTP请求的资源路径进行正则匹配
// 匹配成功说明该路径有对应的业务处理函数,调用该函数进行业务处理;匹配失败则继续匹配
for (const auto &handler : route) {
// 正则表达式
const std::regex &e = handler.first;
// 正则匹配
bool ret = std::regex_match(req._path, req._matches, e);
// 匹配成功执行对应的业务处理函数
if (ret == true) return handler.second(req, resp);
}
// 如果路由表中没有与该资源路径匹配的元素,则返回404
resp->_statu = 404; // Not Found
}
/*请求路由与业务处理*/
void Route(HttpRequest &req, HttpResponse *resp) {
// 处理静态资源请求
if (IsFileRequest(req) == true) return FileHandler(req, resp);
// 分类处理功能性请求
if (req._method == "GET" || req._method == "HEAD") return Dispatcher(req, resp, _get_route);
else if (req._method == "POST") return Dispatcher(req, resp, _post_route);
else if (req._method == "PUT") return Dispatcher(req, resp, _put_route);
else if (req._method == "DELETE") return Dispatcher(req, resp, _delete_route);
// 既不是静态资源请求,又不是功能性请求,则返回错误响应
resp->_statu = 405; // Method Not Allowed
return;
}
/*根据HttpResponse中的响应要素信息组织HTTP响应并发送*/
void FormResponse(const ConnectionPtr &conn, const HttpRequest &req, HttpResponse &resp) {
// 完善头部字段
if (req.IsClose() == true) resp.SetHeader("Connection", "close");
else resp.SetHeader("Connection", "keep-alive");
// 组织HTTP响应 -- 响应首行+响应头部+空行+正文
std::string respStr;
// 响应首行 -- HTTP/1.1 200 ok\r\n
respStr += req._version + " " + std::to_string(resp._statu) + " " + Util::StatuDesc(resp._statu) + "\r\n";
// 响应头部 -- Content-Type: text/html\r\nContent-Length: 500\r\n...
for(const auto& hander : resp._headers) {
respStr += hander.first + ": " + hander.second + "\r\n";
}
// 空行
respStr += "\r\n";
// 响应正文
respStr += resp._body;
// 发送HTTP响应
conn->Send(respStr.c_str(), respStr.size());
}
/*连接建立成功回调函数 -- 由HttpServer模块设置给TcpServer模块,完成上下文的设置*/
void OnConnected(const ConnectionPtr &conn) {
conn->SetContext(HttpContext());
LOG(DEBUG, "New Connection %p", conn.get());
}
/*收到消息回调函数 -- 由HttpServer模块设置给TcpServer模块,完成业务处理*/
void OnMessage(const ConnectionPtr &conn, Buffer *buf) {
while(true) {
// 获取请求上下文
HttpContext *context = conn->GetContext()->get<HttpContext>();
// 通过上下文对缓冲区数据进行接收与解析,得到HTTP请求要素信息 -- HttpRequest
context->RecvAndParse(buf);
HttpRequest req = context->GetRequest();
HttpResponse resp(context->GetRespStatu());
// 如果接收解析失败,则进行错误处理,构建错误响应,并关闭连接
if (context->GetHttpRecvStatu() == RECV_HTTP_ERROR) {
ErrorHandler(&resp);
FormResponse(conn, req, resp);
// 为了避免接收缓冲区中存在数据导致在实际释放连接(ShutdownInLoop函数)时又重新调用OnMessage函数,我们需要将接收缓冲区中的内容清空
// 即使不清空接收缓冲区中的数据,也需要重置请求接收解析的状态,避免程序陷入死循环
buf->Clear();
conn->ShutDown();
return;
}
// 如果接收解析未出错,但也不是完成状态,说明缓冲区数据不足,等待新数据到来
if (context->GetHttpRecvStatu() != RECV_HTTP_OVER) {
return;
}
// 走到这里说明接收解析完成,则循环的进行路由与业务处理,直到缓冲区中的完整请求都被处理完毕
// 请求路由以及业务处理
Route(req, &resp);
// 根据HttpResponse中的响应要素信息组织HTTP响应并发送
FormResponse(conn, req, resp);
// 重置上下文
context->Reset();
// 检查当前是否是短连接通信,是则直接关闭连接,不是则继续处理
if (resp.IsClose()) {
conn->ShutDown();
return;
}
}
}
/*连接关闭回调函数*/
void OnClose(const ConnectionPtr &conn) {
LOG(DEBUG, "Release Connection %p", conn.get());
}
public:
HttpServer(uint16_t port, uint32_t timeout = DEFAULT_TIMEOUT, std::string ip = "0.0.0.0", bool non_block = true)
: _server(port, ip, non_block) {
// 默认启动非活跃连接的定时销毁,超时时间为30s
if (timeout > 0) _server.EnableInactiveRelease(timeout);
_server.SetConnectedCallback(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1));
_server.SetMessageCallback(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));
_server.SetCloseCallback(std::bind(&HttpServer::OnClose, this, std::placeholders::_1));
}
/*设置静态资源根目录*/
void SetWWWRoot(const std::string &wwwroot) {
// 静态资源根目录必须是已存在的目录
assert(Util::IsDirectory(wwwroot) == true);
_wwwroot = wwwroot; }
/*添加GET路由 -- 请求资源的正则表达式与对应的业务处理函数*/
void Get(const std::string ®Exp, const HandleFunc &handler) { _get_route.push_back(make_pair(std::regex(regExp), handler)); }
/*添加POST路由*/
void Post(const std::string ®Exp, const HandleFunc &handler) { _post_route.push_back(make_pair(std::regex(regExp), handler)); }
/*添加PUT路由*/
void Put(const std::string ®Exp, const HandleFunc &handler) { _put_route.push_back(make_pair(std::regex(regExp), handler)); }
/*添加DELETE路由*/
void Delete(const std::string ®Exp, const HandleFunc &handler) { _delete_route.push_back(make_pair(std::regex(regExp), handler)); }
/*设置从属线程池数量*/
void SetThreadCount(size_t count) { return _server.SetThreadCount(count); }
/*启动HTTP服务器*/
void Start() { return _server.Start(); }
private:
// 请求路由表 -- 请求资源的正则表达式与对应的业务处理函数
// 注意:由于std::regex没有默认的哈希函数,也没有默认的相等比较运算符,所以不能使用unordered_map来存储路由信息
std::vector<std::pair<std::regex, HandleFunc>> _get_route;
std::vector<std::pair<std::regex, HandleFunc>> _post_route;
std::vector<std::pair<std::regex, HandleFunc>> _put_route;
std::vector<std::pair<std::regex, HandleFunc>> _delete_route;
std::string _wwwroot; // 静态资源根目录
TcpServer _server; // 主从Reactor高并发TCP服务器
};
6. HttpServer 简单测试
下面我们分别对 HttpServer 的 GPT、POST、PUT 以及 DELETE 请求进行简单测试。
测试代码如下:
#include "http.hpp"
#define WWWROOT "./wwwroot"
/*组织HTTP请求用于简单回显*/
std::string FormRequest(const HttpRequest &req) {
std::string reqStr;
// 请求首行 -- GET /login HTTP/1.1\r\n
reqStr += req._method + " " + req._path + " " + req._version + "\r\n";
// 查询字符串
for(const auto ¶m : req._params) {
reqStr += param.first + ": " + param.second + "\r\n";
}
// 请求头部 -- Content-Type: text/html\r\nContent-Length: 123\r\n...
for(const auto &header : req._headers) {
reqStr += header.first + ": " + header.second + "\r\n";
}
// 空行
reqStr += "\r\n";
// 请求正文
reqStr += req._body;
return reqStr;
}
/*HTTP服务器各种功能性请求对应的业务处理函数,为了便于测试这里只是简单回显*/
void Hello(const HttpRequest &req, HttpResponse *resp) {
// 测试业务处理超时
// sleep(25);
resp->SetContent(FormRequest(req), "text/plain");
}
void Login(const HttpRequest &req, HttpResponse *resp) {
resp->SetContent(FormRequest(req), "text/plain");
}
void PutFile(const HttpRequest &req, HttpResponse *resp) {
resp->SetContent(FormRequest(req), "text/plain");
// 大文件传输测试
// std::string pathname = WWWROOT + req._path;
// Util::WriteFile(pathname, req._body);
}
void DeleteFile(const HttpRequest &req, HttpResponse *resp) {
resp->SetContent(FormRequest(req), "text/plain");
}
int main()
{
HttpServer server(8082);
// 设置静态资源根目录与子线程数量
server.SetWWWRoot(WWWROOT);
server.SetThreadCount(4);
// 添加路由信息
server.Get("/hello", Hello);
server.Post("/login", Login);
server.Put("/file.txt", PutFile);
server.Delete("/file.txt", DeleteFile);
// 启动HTTP服务器
server.Start();
return 0;
}
项目目录结构如下:
静态资源根目录中的文件信息如下:
回显服务器测试结果如下:
登录请求测试结果如下:
PUT 请求测试结果如下:
DELETE 请求测试结果如下:
六、功能测试
前面我们已经完成了 SERVER 模块和协议模块的开发,并进行了简单的功能测试,下面我们来进行一些边界性的功能测试,观察服务器在边界情况下能够正常运行。
1. 服务器长连接测试
创建一个客户端,设置 Connection 头部字段为 keep-alive,观察客户端是否能够持续与服务器进行通信。
/*服务器长连接测试 -- 设置Connection为keep-alive,观察客户端持续通信是否正常*/
void LongConnection_test() {
Socket clientSock;
clientSock.CreateClient(8082, "127.0.0.1");
std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
while(true) {
// 发送数据
ssize_t n = clientSock.Send(req.c_str(), req.size());
// 接收数据
char buf[1024] = { 0 };
n = clientSock.Recv(buf, sizeof(buf) - 1);
LOG(DEBUG, "[%s]", buf);
sleep(3);
}
clientSock.Close();
}
从测试结果可以看到,客户端能够持续与服务器进行通信,并且服务器也不会在 30s 后将客户端连接释放,而是等待客户端主动退出后才会释放:
2. 服务器超时连接测试
客户端连接上服务器后,长时间不给服务器发送数据,观察超时时间 (30s) 后服务器是否会将客户端连接进行释放。
/*服务器非活跃连接释放测试 -- 设置Connection为keep-alive,通信一段时间后客户端不再发送数据,观察TIMEOUT时间后服务器是否会释放客户端连接*/
void ConnectionTimeout_test() {
Socket clientSock;
clientSock.CreateClient(8082, "127.0.0.1");
std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
for(int i = 0; true; i++) {
ssize_t n = clientSock.Send(req.c_str(), req.size());
char buf[1024] = { 0 };
n = clientSock.Recv(buf, sizeof(buf) - 1);
LOG(DEBUG, "[%s]", buf);
sleep(3);
if (i == 3) break;
}
while(true) sleep(1);
clientSock.Close();
}
从测试结果可以看到,服务器经过超时时间后自动将客户端连接释放:
3. 服务器错误请求测试
给服务器发送一个请求,添加头部字段 Content-Length 为100,但实际发送的正文长度不足100,观察服务器的处理结果;我们的预期结果有两种:
- 如果客户端只发生一次请求,由于服务器未接收到完整请求(正文数据不足),所以会等待新数据到来,不会给与客户端响应,直到连接超时释放。
- 如果客户端发送多次请求,那么服务器会将后面的请求字段作为第一次请求的正文,完成业务处理后发送一次响应,但这样很有可能会导致后面的请求解析错误。
/*错误请求测试*/
void ErrorRequest_test() {
Socket clientSock;
clientSock.CreateClient(8082, "127.0.0.1");
// 正文长度声明为100,但实际的正文只有11个字符
std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 100\r\n\r\nHelloServe";
while(true) {
// 发送一次数据,得不到响应
assert(clientSock.Send(req.c_str(), req.size()) >= 0);
// 发送多次数据,超过100字节时会得到一次响应,后续处理出错导致连接被释放
assert(clientSock.Send(req.c_str(), req.size()) >= 0);
assert(clientSock.Send(req.c_str(), req.size()) >= 0);
char buf[1024] = { 0 };
assert(clientSock.Recv(buf, sizeof(buf) - 1) >= 0);
LOG(DEBUG, "[%s]", buf);
}
while(true) sleep(1);
clientSock.Close();
}
预期结果一,连接超时释放:
预期结果二,请求解析出错:
4. 服务器业务处理超时测试
当服务器达到性能瓶颈,即处理一次业务花费的时间超过了服务器设置的非活跃连接超时时间时,查看服务器的处理情况。我们的预期结果如下:
- 由于服务器进行单次业务处理的时间超过了连接的超时时间,所以可能导致其他连接被拖累从而超时释放,具体来说,假设现在4 5 6 7描述符就绪,并且在处理4号描述符就绪事件时超时,那么会出现以下两种情况:
- 如果4后面的5 6 7号都是通信连接描述符,则并不影响,因为4号描述符就绪事件处理完毕后就会处理它们的就绪事件并刷新其活跃度。
- 如果5号描述符是定时器描述符,此时定时器触发超时,就会执行定时任务,由于6、7号描述符被4号描述符拖累,达到了超时时间,因此会被释放,从而导致在进行6 7业务处理时发生内存访问错误 (6号同理)。
- 因此,在本次事件处理过程中,并不能直接释放通信连接,而应该将释放操作压入任务队列中,待就绪事件全部处理完毕后再真正释放连接。
void HandleTimeout_test() {
signal(SIGCHLD, SIG_IGN);
for (int i = 0; i < 10; i++)
{
pid_t tid = fork();
if (tid == -1) {
LOG(DEBUG, "Fork Error");
return;
}
// 子进程模拟客户端向服务器发送请求
if (tid == 0) {
Socket clientSock;
clientSock.CreateClient(8082, "127.0.0.1");
std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
for(int i = 0; true; i++) {
assert(clientSock.Send(req.c_str(), req.size()) >= 0);
char buf[1024] = { 0 };
assert(clientSock.Recv(buf, sizeof(buf) - 1) >= 0);
LOG(DEBUG, "[%s]", buf);
}
clientSock.Close();
exit(0);
}
}
while(true) sleep(1);
}
测试结果如下:
5. 服务器同时多条请求测试
客户端一次性给服务器发送多条请求,观察服务器处理结果。
/*多条请求处理测试 -- 客户端一次性给服务器发送多条请求,观察服务器处理结果*/
void MutilpleRequests_test() {
Socket clientSock;
clientSock.CreateClient(8082, "127.0.0.1");
std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
// 后面拼接多条请求
req += req;
req += req;
assert(clientSock.Send(req.c_str(), req.size()) >= 0);
char buf[1024] = { 0 };
assert(clientSock.Recv(buf, sizeof(buf) - 1) >= 0);
LOG(DEBUG, "[%s]", buf);
clientSock.Close();
}
测试结果如下,服务器能够正常处理并响应:
6. 服务器大文件传输测试
使用PUT方法向服务器传输大文件,观察服务器处理结果。
/*大文件传输测试 -- 使用PUT方法向服务器传输大文件,观察服务器处理结果*/
void LargeFile_test() {
Socket clientSock;
clientSock.CreateClient(8082, "127.0.0.1");
std::string req = "PUT /file.txt HTTP/1.1\r\nConnection: keep-alive\r\n";
std::string body;
// 由于云服务器内存空间限制,这里test.txt大小为200M
Util::ReadFile("./test.txt", &body);
req += "Content-Length: " + std::to_string(body.size()) + "\r\n\r\n";
assert(clientSock.Send(req.c_str(), req.size()) >= 0);
assert(clientSock.Send(body.c_str(), body.size()) >= 0);
char buf[1024] = { 0 };
assert(clientSock.Recv(buf, sizeof(buf) - 1) >= 0);
LOG(DEBUG, "[%s]", buf);
clientSock.Close();
}
服务器内存情况以及 test.txt 文件情况如下:
测试结果如下,服务器能够正常处理并响应:
7. 服务器性能压力测试
使用服务器压力测试工具 WebBench 模拟多个客户端同时访问服务器,测试服务器的并发量 (可以同时处理多少个客户端的请求而不会出现连接失败) 以及 QPS (每秒钟处理的包的数量)。
测试环境如下:
- 服务器为2核2G带宽4M的云服务器。
- 在服务器上运行 WebBench 程序。
- 使用 WebBench 程序以 1000 的并发量,进行 1h 的测试。
测试结果如下 ( 2000 QPS ):
项目源码:
https://gitee.com/tian-hongjin/project-design/tree/master/TcpServer/source