One Thread One Loop主从Reactor模型⾼并发服务器
文章目录
- One Thread One Loop主从Reactor模型⾼并发服务器
- 一些补充
- HTTP服务器
- Reactor 模型
- eventfd
- 通用类Any
- 目标
- 功能模块划分:
- SERVER模块
- Buffer模块:
- 编写思路:
- 接口设计:
- 具体实现:
- 日志宏模块:
- 编写思路:
- 具体实现:
- Socket模块:
- 编写思路:
- 接口设计:
- 具体实现:
- Channel模块:
- 编写思路:
- 接口设计:
- 具体实现:
- Connection模块
- 编写思路:
- 接口设计:
- 具体实现:
- Acceptor模块:
- 编写思路:
- 接口设计:
- 具体实现:
- TimerQueue模块:
- 编写思路:
- 接口设计:
- 具体实现:
- Poller模块:
- 编写思路:
- 接口设计:
- 具体实现:
- EventLoop模块:
- 编写思路:
- 接口设计:
- 具体实现:
- TcpServer模块:
- 编写思路:
- 接口设计:
- 具体实现:
- 协议模块
- Util模块:
- 具体实现:
- HttpRequest模块:
- 具体实现:
- HttpResponse模块:
- 具体实现:
- 请求接收上下文模块:
- 具体实现:
- HttpServer模块
- 具体实现:
- 性能测试
- 测试环境:
- 测试结果
One Thread One Loop的思想就是把所有的操作都放到⼀个线程中进行,⼀个线程对应⼀个事件处理的循环。
一些补充
HTTP服务器
HTTP(Hyper Text Transfer Protocol),超⽂本传输协议是应⽤层协议,是⼀种简单的请求-响应协议(客⼾端根据⾃⼰的需要向服务器发送请求,服务器针对请求提供服务,完毕后通信结束)。
![外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传](https://img-home.csdnimg.cn/images/20230724024159.png?
HTTP协议是⼀个运⾏在TCP协议之上的应⽤层协议,这⼀点本质上是告诉我们,HTTP服务器其实就是个TCP服务器,只不过在应⽤层基于HTTP协议格式进⾏数据的组织和解析来明确客⼾端的请求并完成业务处理。
如何实现一个HTTP服务器?进行以下几步即可:
- 搭建一个TCP服务器
- 以HTTP协议格式进行解析请求数据,明确客户端的请求
- 给客户端提供对应的服务
- 把服务结果以HTTP形式进行组织发送回客户端
Reactor 模型
Reactor 模式也叫Dispatcher 模式,简单来说我们就是利用I/O多路转接(我们采用ET模式的epoll)统一监听事件,收到事件后分发给处理进程或线程。
Reactor 模式的优点包括:
- 高性能:通过异步处理事件,可以充分利用系统资源,提高性能。
- 可扩展性:可以轻松添加新的事件类型和处理程序,使应用程序更加灵活和可扩展。
- 响应性:能够快速响应事件,适用于需要实时性的应用程序,如网络服务器。
Reactor模式一般可以分为三类:
-
单Reactor单线程:单I/O多路复⽤+业务处理
即在单个线程中进行事件监控和处理。
-
单Reactor多线程:单I/O多路复⽤+线程池(业务处理)
-
多Reactor多线程:多I/O多路复⽤+线程池(业务处理)
基于单Reactor多线程的缺点改进,我们选择让主Reactor线程单独进行新连接事件监控,从属Reactor线程进行IO事件监控,从属Reactor线程再让业务线程池进行业务处理。
注意点:
执行流并非越多越好,执行流太多,反而增加了CPU切换调度的成本
eventfd
eventfd是一种事件通知机制。创建一个描述符用于实现事件通知。
eventfd本质在内核中管理的是一个计数器。创建eventfd就会在内核中创建一个计数器(结构),向eventfd中写入一个数值,用于表示事件通知次数。可以用read进行数据的读取,读取到的数据就是通知的次数。如:每次给eventfd中写入个1,就表示通知1次,连续三次后,read读取的数字就是3,读取后计数清零。
用处:
- 在EventLoop模块中实现线程间的事件通知功能
#include <sys/eventfd.h>
int eventfd(unsigned int inital,int flags);
功能:创建一个eventfd对象,实现事件通知
参数:
initial: 计数初值
flags:
EFD_CLOEXEC 禁止进程复制
EFD_NONBLOCK 启动非阻塞属性
返回值:返回一个文件描述符用于操作
eventfd也是通过read/write/close等进行操作的。
注意点:read和write进行IO的时候数据只能是一个8字节数据。
通用类Any
可以直接用库里自带的,这里用的是自己实现的
#pragma once
#include<iostream>
#include<typeinfo>
#include<cassert>
#include<unistd.h>
class Any
{
private:
class holder
{
public:
virtual ~holder() {}
virtual const std::type_info& type() = 0;
virtual holder *clone() = 0;
};
template <class T>
class placeholder : public holder
{
public:
placeholder(const T& val):_val(val) {};
//获取子类对象保存的数据类型
virtual const std::type_info& type()
{
return typeid(T);
}
//克隆一个新的子类对象
virtual holder *clone()
{
return new placeholder(_val);
}
public:
T _val;
};
holder* _content;
public:
Any():_content(nullptr) {}
template<class T>
Any(const T& val)
:_content(new placeholder<T>(val))
{}
Any(const Any& other)
:_content(other._content ? other._content->clone() : nullptr){} //拷贝构造的对象为空就置为空,非空调用拷贝对象的clone
~Any()
{
delete _content;
}
Any& swap(Any& other)
{
std::swap(_content,other._content);
return *this;
}
template<class T>
T* get() //返回子类对象保存数据的指针
{
//获取类型与保存类型必须一致
assert(typeid(T) == _content->type());
return &((placeholder<T>*)_content)->_val; //考虑好优先级 基类指针强转为派生类才能访问派生类对象中存储的数据
}
template<class T>
Any& operator=(const T& val) //赋值运算符重载函数
{
//类似于现代写法
Any(val).swap(*this);
return *this; //返回引用 为了能连续赋值
}
Any& operator=(const Any& other)
{
Any(other).swap(*this);
return *this;
}
};
目标
我们要实现的服务器本⾝并不存在业务,咱们要实现的应该算是⼀个⾼性能服务器基础库,是⼀个基础组件。
采用多Reactor多线程模式(也叫主从Reactor模型)
**为了方便实现,将业务线程部分去掉了,直接交给从属Reactor线程进行处理,即One Thread One Event Loop.**一个线程对应了一个循环:IO事件监控+IO操作+业务处理
功能模块划分:
基于以上的理解,我们要实现的是⼀个带有协议⽀持的Reactor模型⾼性能服务器,因此将整个项⽬的实现划分为两个⼤的模块:
• SERVER模块:实现Reactor模型的TCP服务器;
• 协议模块:对当前的Reactor模型服务器提供应⽤层协议⽀持
SERVER模块
SERVER模块就是对所有的连接以及线程进⾏管理,让它们各司其职,在合适的时候做合适的事,最终完成⾼性能服务器组件的实现。
Buffer模块:
编写思路:
1.实现缓冲区,得有一块内存空间。采用vector容器进行空间管理(为何不用string,因为string操作中遇到‘/0’就停止)
2.要素:
- 默认的空间大小
- 当前的读取数据位置
- 当前的写入数据位置
以类似于循环数组的方式存储数据,当剩余空间不足时再选择扩容
写入数据:
从当前写入位置指向的位置开始写入。看剩余空间是否足够(包含读偏移之前的空闲空间):
- 足够:数据都前移到起始位置
- 不够:从当前位置扩容足够大小
写入成功后,写偏移向后移动
读取数据
有数据可读时,从读偏移指向位置读取。
可读数据大小 = 写偏移 - 读偏移
接口设计:
//接口设计
class Buffer{
private:
std::vector<char> _buffer;
uint64_t _read_index;
uint64_t _write_idx;
public:
//1.获取当前写位置地址
//2.确保可写空间足够 (足够:移动 不足:扩容)
//3.获取读偏移前空闲空间大小
//4.获取写偏移后空闲空间大小
//5.写位置向后移动指定长度
//6.读位置向后移动指定长度
//7.获取读位置地址
//8.获取可读空间大小
//9.清理功能 - 读写位置归0
};
具体实现:
class Buffer //自己实现时删除冗余部分
{
private:
std::vector<char> _buffer; // 使用vector进行内存空间管理
uint64_t _read_idx; // 读偏移
uint64_t _writer_idx; // 写偏移
public:
Buffer() : _read_idx(0), _writer_idx(0), _buffer(BUFFER_DEFAULT_SIZE) {}
char *Begin()
{
return &*_buffer.begin();
}
// 1.获取当前写入起始地址
char *WritePosition()
{
// buffer的空间起始地址加上写偏移量
return Begin() + _writer_idx;
}
// 2.获取当前读取起始地址
char *ReadPosition()
{
return Begin() + _read_idx;
}
// 3.获取空闲空间大小 前沿空间大小 后延空间大小
uint64_t TailIdleSize()
{
// 总体空间大小减去写偏移
return _buffer.size() - _writer_idx;
}
uint64_t HeadIdleSize()
{
return _read_idx;
}
// 4.获取可读数据大小
uint64_t ReadAbleSize()
{
return _writer_idx - _read_idx;
}
// 5.将读偏移向后移动
void MoveReadOffset(uint64_t len)
{
// 向后移动大小必须小于可读数据大小
if(0 == len){
return;
}
assert(len <= ReadAbleSize());
_read_idx += len;
}
// 6.将写偏移向后移动
void MoveWriteOffset(uint64_t len)
{
assert(len <= TailIdleSize()); // 写入前需要确保后延空间足够
_writer_idx += len;
}
// 7.确保可写空间足够 数据挪动 还是 扩容
void EnsureWriteSpace(uint64_t len)
{
// 1.末尾空闲空间足够 就返回
if (TailIdleSize() >= len)
{
return;
}
// 末尾空间不够 加起始空间大小,够就挪动 不够就扩容
if (len <= TailIdleSize() + HeadIdleSize())
{
// 挪动数据
uint64_t rsz = ReadAbleSize(); // 获取当前数据大小
std::copy(ReadPosition(), ReadPosition() + rsz, Begin()); // 可读数据拷贝到了起始位置
_read_idx = 0; // 读偏移归零
_writer_idx = rsz; // 写偏移置为可读数据大小即写偏移量
}
else // 空间不够 需扩容
{
// 不移动数据,直接给写偏移之后扩容足够空间即可
_buffer.resize(_writer_idx + len);
}
}
// 写入数据
void Write(const void *data, uint64_t len)
{
if(0 == len){
return;
}
// 1.保证空间足够 2.拷贝数据
EnsureWriteSpace(len);
const char *d = (const char *)data;
std::copy(d, d + len, WritePosition());
}
void WriteAndPush(const void *data, uint64_t len)
{
Write(data, len);
MoveWriteOffset(len);
}
void WriteString(const std::string &data)
{
Write(data.c_str(), data.size());
}
void WriteStringAndPush(const std::string &data)
{
WriteString(data);
MoveWriteOffset(data.size());
}
void WriteBuffer(Buffer &data)
{
Write(data.ReadPosition(), data.ReadAbleSize());
}
void WriteBufferAndPush(Buffer &data)
{
WriteBuffer(data);
MoveWriteOffset(data.ReadAbleSize());
}
// 读取数据
void Read(void *buf, uint64_t len)
{
// 要求获取数据大小小于可读数据大小
assert(len <= ReadAbleSize());
std::copy(ReadPosition(), ReadPosition() + len, (char *)buf);
}
void ReadAndPop(void *buf, uint64_t len) // 读数据并且弹出已读数据
{
Read(buf, len);
MoveReadOffset(len);
}
std::string ReadAsString(uint64_t len)
{
assert(len <= ReadAbleSize());
std::string str;
str.resize(len);
Read(&str[0], len);
return str;
}
std::string ReadAsStringAndPop(uint64_t len)
{
assert(len <= ReadAbleSize());
std::string str = ReadAsString(len);
MoveReadOffset(len);
return str;
}
char *FindCRLF()
{
void *res = memchr(ReadPosition(), '\n', ReadAbleSize()); // 在空间中找对应字符
return (char *)res;
}
std::string GetLine()
{
char *pos = FindCRLF();
if (pos == nullptr)
{
return "";
}
return ReadAsString(pos - ReadPosition() + 1); //+1是为了取出换行字符
}
std::string GetLineAndPop()
{
std::string str = GetLine();
MoveReadOffset(str.size());
return str;
}
// 清空缓冲区
void Clear() // 将偏移量归零即可
{
_read_idx = 0;
_writer_idx = 0;
}
};
日志宏模块:
编写思路:
为了方便调试,我编写了一个简单的打印日志的宏函数 ,首先设置了三个日志等级INF、DBG、ERR,我们可以通过更改打印的日志等级来实现日志是否需要打印,所以我们额外分别设置这三个等级的日志接口,其实就是在我们基本的日志宏函数传了个等级参数罢了。
具体实现:
#pragma once
#include<ctime>
#include<cstdio>
#define INF 0
#define DBG 1
#define ERR 2
#define LOG_LEVEL DBG
#define LOG(level,format,...) do{\
if(level < LOG_LEVEL) break;\
time_t t = time(NULL);\
struct tm* ptm = localtime(&t);\
char tmp[32] = {0};\
strftime(tmp,31,"%H:%M:%S",ptm);\
fprintf(stdout,"[%s %s:%d]" format "\n",tmp,__FILE__,__LINE__,##__VA_ARGS__);\
}while(0);
#define INFLOG(format,...) LOG(INF,format,##__VA_ARGS__);
#define DBGLOG(format,...) LOG(DBG,format,##__VA_ARGS__);
#define ERRLOG(format,...) LOG(ERR,format,##__VA_ARGS__);
Socket模块:
编写思路:
常规的套接字接口,如上图所示的10个功能,需要进行实现。除此之外,我们额外需要实现两个接口:
- 开启地址端口重用 — 防止服务器出问题后无法立即重启
- 设置套接字为非阻塞属性 — ET模式的要求,我们需要一次性读完需要的数据(即读到没有数据为止),如果是阻塞属性会导致阻塞,因此我们需要设置非阻塞属性 。
接口设计:
class Socket{
private:
int _sockfd;
public:
Socket(); //构造
~Scoket(); //析构
//1.创建套接字
//2.绑定地址信息
//3.开始监听
//4.向服务器发起连接
//5.获取新连接
//6.接收数据
//7.发送数据
//8.关闭套接字
//9.创建一个服务端连接
//10.创建一个客户端连接
//11.开启地址端口重用
//12.设置套接字为非阻塞属性
};
具体实现:
#define MAX_LISTEN 1024
class Socket
{
private:
int _sockfd;
public:
Socket() : _sockfd(-1) {}
Socket(int fd) : _sockfd(fd) {}
~Socket()
{
Close();
}
int FD(){
return _sockfd;
}
// 创建套接字
bool Create()
{
// int socket(int domain, int type, int protocol);
_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
ERR_LOG("CREATE SOCKET FAILED");
return false;
}
return true;
}
// 绑定地址信息
bool Bind(const std::string &ip, uint16_t port)
{
// int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(addr);
int ret = bind(_sockfd, (struct sockaddr *)&addr, len);
if (ret < 0)
{
ERR_LOG("BIND FALIED");
return false;
}
return true;
}
// 开始监听
bool Listen(int backlog = MAX_LISTEN)
{
// int listen(int sockfd, int backlog);
int ret = listen(_sockfd, backlog);
if (ret < 0)
{
ERR_LOG("LISTEN FALIED");
return false;
}
return true;
}
// 向服务器发起连接
bool Connect(const std::string &ip, uint16_t port)
{
// int connect(int sockfd, const struct sockaddr *addr,socklen_t addrlen);
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(addr);
int ret = connect(_sockfd, (struct sockaddr *)&addr, len);
if (ret < 0)
{
ERR_LOG("CONNECT FALIED");
return false;
}
return true;
}
// 获取新连接
int Accept()
{
// int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
int newfd = accept(_sockfd, NULL, NULL);
if (newfd < 0)
{
ERR_LOG("ACCEPT FAILED");
return -1;
}
return newfd;
}
// 接收数据
ssize_t Recv(void *buf, size_t len, int flag = 0)
{
// ssize_t recv(int sockfd, void *buf, size_t len, int flags);
ssize_t ret = recv(_sockfd, buf, len, flag);
if (ret <= 0)
{ // EINTR 表示当前socket阻塞等待,被信号打断了
if (errno == EAGAIN || errno == EINTR)
{ // EAGAIN 当前socket的接收缓冲区没数据,非阻塞情况下才有该错误
return 0;
}
return -1;
}
return ret; // 实际接收数据长度
}
// 非阻塞接收数据
ssize_t NonBlockRecv(void *buf, size_t len)
{
return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞
}
// 发送数据
ssize_t Send(const void *buf, size_t len, int flag = 0)
{
// ssize_t send(int sockfd, const void *buf, size_t len, int flags);
ssize_t ret = send(_sockfd, buf, len, flag);
if (ret < 0)
{
ERR_LOG("SEND FAILED");
return -1;
}
return ret; // 实际发送的数据长度
}
ssize_t NonBlockSend(void *buf, size_t len)
{
return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞
}
// 关闭套接字
bool Close()
{
if(_sockfd != -1){
close(_sockfd);
}
_sockfd = -1;
}
// 创建一个服务器连接
bool CreateServer(uint16_t port,bool flag = false){
//1.创建套接字 //2.绑定地址 //3.开始监听 //4,设置非阻塞 //5.启动地址重用
if(Create() == false) return false;
if(flag) NonBlock();
if(Bind("0.0.0.0",port) == false) return false;
if(Listen() == false) return false;
ReuseAddress();
return true;
}
// 创建一个客户端连接
bool CreateClient(const std::string& ip,uint16_t port){
//1,创建套接字
//2.连接服务器
if(Create() == false) return false;
if(Connect(ip,port) == false) return false;
return true;
}
// 设置套接字选项---开启地址端口重用
void ReuseAddress(){
// int setsockopt(int sockfd, int level, int optname,const void *optval, socklen_t optlen);
int val = 1;
setsockopt(_sockfd,SOL_SOCKET,SO_REUSEADDR,(void*)&val,sizeof(int));
val = 1;
setsockopt(_sockfd,SOL_SOCKET,SO_REUSEPORT,(void*)&val,sizeof(int));
}
// 设置套接字阻塞属性
void NonBlock(){
//int fcntl(int fd, int cmd, ... /* arg */ );
int flag = fcntl(_sockfd,F_GETFL,0);
fcntl(_sockfd,F_SETFL,flag|O_NONBLOCK);
}
};
Channel模块:
编写思路:
- 对一个描述符进行事件管理:因为我们是用epoll进行事件监控,所以我们操作就是uint32_t event标志位
- 如EPOLLIN(可读)、EPOLLOUT(可写)、EPOLLRDHUP(连接断开)、EPOLLHUP(挂断)、EPOLLERR(错误)、EPOLLPRI(优先数据) 等
- 对事件触发后如何处理进行管理:
- 需要处理的事件:可读、可写、挂断、错误、任意
- 有五种事件需要处理,需要五个回调函数
接口设计:
class Channel{
private:
uint32_t _events; //当前需要监控的事件
uint32_t _revents; //当前实际触发的事件
using EventCallback = std::function<void( )>;
EventCallback _read_callback; //可读事件回调函数
EventCallback _write_callback; //可写事件回调函数
EventCallback _error_callback; //错误事件回调函数
EventCallback _close_callback; //连接断开回调函数
EventCallback _event_callback; //任意事件回调函数
public:
//事件管理
//1.描述符是否可读
//2.描述符是否可写
//3.对描述符监控可读
//4.对描述符监控可写
//5.解除读事件监控
//6.解除写事件监控
//7.解除所有监控
//8.移除监控,将描述符信息从epoll中移除
//事件触发后的处理管理
//1.设置各种事件的回调
//2.整合的一个事件处理函数,触发了事件就调用该函数,根据_revents来判断调用对应的处理函数
};
具体实现:
// channel模块
// 对一个描述符进行IO事件管理 实现对描述符可读、可写等的操作
// Poller模块对描述符进行IO 事件监控就绪后,回调不同的处理函数功能
class Poller;
class EventLoop;
class Channel
{
private:
int _fd;
uint32_t _event; // 当前需要监控的事件
uint32_t _events; // 当前连续触发的事件
EventLoop *_loop;
using EventCallBack = std::function<void()>;
EventCallBack _read_callback; // 读事件触发回调
EventCallBack _write_callback; // 写事件触发回调
EventCallBack _error_callback; // 错误事件触发回调
EventCallBack _close_callback; // 断开事件触发回调
EventCallBack _event_callback; // 任意事件触发回调
public:
Channel(EventLoop *loop, int fd) : _loop(loop), _fd(fd), _event(0), _events(0){};
int Fd()
{
return _fd;
}
uint32_t Event()
{
return _event;
}
// 设置实际就绪实际
void SetEvents(uint32_t _event)
{
_events = _event;
}
// 1.是否可读
bool IsRead()
{
return _event & EPOLLIN;
}
// 2.是否可写
bool IsWrite()
{
return _event & EPOLLOUT;
}
// 3.使 可读
void EnableRead()
{
_event |= EPOLLIN;
UpDate();
}
// 4.使 可写
void EnableWrite()
{
_event |= EPOLLOUT;
UpDate();
}
// 5.取消可读
void DisableRead()
{
_event &= ~EPOLLIN;
UpDate();
}
// 6.取消可写
void DisableWrite()
{
_event &= ~EPOLLOUT;
UpDate();
}
// 7.取消所有监控
void DisableAll()
{
_event = 0;
UpDate();
}
// 8.设置各个回调函数
void SetWriteCallBack(const EventCallBack &cb)
{
_write_callback = cb;
}
void SetReadCallBack(const EventCallBack &cb)
{
_read_callback = cb;
}
void SetErrorCallBack(const EventCallBack &cb)
{
_error_callback = cb;
}
void SetCloseCallBack(const EventCallBack &cb)
{
_close_callback = cb;
}
void SetEventCallBack(const EventCallBack &cb)
{
_event_callback = cb;
}
// 9.事件处理函数
void HandleEvent()
{
if (_event_callback)
{
_event_callback();
}
if ((_events & EPOLLIN) || (_events & EPOLLRDHUP) || (_events & EPOLLPRI))
{ // 对方关闭连接也触发可读事件
if (_read_callback){
_read_callback();
}
}
if (_events & EPOLLOUT)
{
if (_write_callback)
{
_write_callback();
}
} // 有可能释放连接的操作事件,一次只处理一个
else if (_events & EPOLLERR)
{
if (_error_callback)
{
_error_callback();
}
}
else if (_events & EPOLLHUP)
{
if (_close_callback)
{
_close_callback();
}
}
}
void UpDate();
void ReMove();
// void UpDate(){
// return _poller->UpDateEvent(this);
// }
// //移除监控
// void ReMove(){
// return _poller->ReMoveEvent(this);
// }
};
Connection模块
编写思路:
对连接进行全方位的管理。如:
- 套接字的管理: 进行套接字的操作
- 连接事件的管理: 可读、可写、错误、挂断、任意
- 缓冲区的管理,便于socket数据的接收和发送。
- 协议上下文的管理,请求数据的处理
- 回调函数的管理
接口设计:
enum ConnStatu{
DISCONNECTED, //连接关闭状态
CONNECTING, // 连接建立成功 - 待处理状态
CONNECTED, //连接建立完成,各设置已完成 可以通信
DISCONNCETING //待关闭状态
};
class Connection{
private:
uint64_t _conn_id; //唯一标识 连接id
ConnStatu _statu; //连接状态
bool _enable_inactive_release; //连接是否启动非活跃销毁的判断标志
EventLoop* _loop; //连接所关联的一个Eventloop
int _sockfd; //连接关联的描述符
Socket _sock; //套接字操作管理
Channel _channel; //连接事件管理
Buffer _in_buffer; //输入缓冲区
Buffer _out_buffer; //输出缓冲区
Any _context; //请求的接收处理上下文
//这四个回调函数是由组件使用者来设置的
using ConnectedCallback = std::function<void(const PtrConnection&)>;
using MessageCallback = std::function<void(const PtrConnection&,Buffer*)>;
using ClosedCallback = std::function<void(const PtrConnection&)>;
using AnyEventCallback = std::function<void(const PtrConnection&)>;
ConnectedCallback _connected_callback;
MessageCallback _message_callback;
ClosedCallback _close_callback;
AnyEventCallback _event_callback;
//组件内的连接关闭回调
ClosedCallback _server_closed_callback;
public:
//构造
//析构
//1.发送数据
//2.提供给组件使用者的关闭接口
//3.启动非活跃连接销毁
//4.取消非活跃连接销毁
//5.协议切换 --- 重置上下文和阶段性处理函数
具体实现:
enum ConnStatu{
DISCONNECTED, //连接关闭状态
CONNECTING, // 连接建立成功 - 待处理状态
CONNECTED, //连接建立完成,各设置已完成 可以通信
DISCONNCETING //待关闭状态
};
class Connection;
using PtrConnection = std::shared_ptr<Connection>;
class Connection: public std::enable_shared_from_this<Connection>{
private:
uint64_t _con_id; //连接的唯一ID,便于连接管理和查找
//uint64_t _timer_id; //定时器ID,必须唯一 为了简化操作,直接使用_con_id作为定时器id.
int _sockfd; //连接关联的文件描述符
bool _enable_inactive_release; //连接是否启动非活跃销毁的判断标志
EventLoop* _loop; //连接所关联的一个Eventloop
Socket _socket; //套接字操作管理
Channel _channel; //连接事件管理
Buffer _in_buffer; //输入缓冲区 --- 存放从socket中读取到的数据
Buffer _out_buffer; //输出缓冲区 --- 存放发送给对端的数据
Any _contex; //请求的接收上下文
ConnStatu _statu; //连接的状态
//这四个回调函数是由组件使用者来设置的
using ConnectedCallback = std::function<void(const PtrConnection&)>;
using MessageCallback = std::function<void(const PtrConnection&,Buffer*)>;
using ClosedCallback = std::function<void(const PtrConnection&)>;
using AnyEventCallback = std::function<void(const PtrConnection&)>;
ConnectedCallback _connected_callback;
MessageCallback _message_callback;
ClosedCallback _close_callback;
AnyEventCallback _event_callback;
//组件内的连接关闭回调
ClosedCallback _server_closed_callback;
private:
//五个channel的事件回调函数
void HandlerRead(){ //描述符可读事件触发后调用的函数
char buf[65536];
ssize_t ret = _socket.NonBlockRecv(buf,65535);
if(ret<0){
return ShutDownInLoop();
//return;
}else if(ret == 0){
return; //这里的等于0是未读取到数据 而非连接断开
}
//将数据放入输入缓冲区
_in_buffer.WriteAndPush(buf,ret);
//2.调用message_callback 进行业务处理
if(_in_buffer.ReadAbleSize()>0){
return _message_callback(shared_from_this(),&_in_buffer);
}
return ReleaseInLoop(); //实际的关闭操作
}
void HandlerWrite(){ //描述符触发写事件
//out_buff中保存的数据就是要发送的数据
ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(),_out_buffer.ReadAbleSize());
if(ret<0){
//发送错误该关闭连接了
if(_in_buffer.ReadAbleSize() > 0){
_message_callback(shared_from_this(),&_in_buffer);
return ReleaseInLoop(); //这时候就是实际的关闭释放操作了
}
}
_out_buffer.MoveReadOffset(ret); //记得将读偏移向后移动
if(_out_buffer.ReadAbleSize() == 0){
_channel.DisableWrite(); //没有数据待发送就关闭写事件监控
if(_statu == DISCONNCETING){
return ReleaseInLoop();
}
}
return;
}
void HandlerClose(){
//一旦连接挂断,套接字就什么都干不了,因此有数据待处理就处理 完成关闭连接
if(_in_buffer.ReadAbleSize()>0){
_message_callback(shared_from_this(),&_in_buffer);
}
return ReleaseInLoop();
}
void HandlerError(){
HandlerClose();
}
void HandlerEvent(){ // 1.刷新连接活跃度 延迟定时销毁任务 2.调用组件使用者的任意事件回调
if(_enable_inactive_release == true){
_loop->TimerRefresh(_con_id);
}
if(_event_callback){
_event_callback(shared_from_this());
}
}
void EstablishedInLoop(){ //连接获取之后,给channel设置事件回调 启动读监控
//1.修改连接状态 2.启动读事件监控 3.调用回调函数
assert(_statu == CONNECTING); //状态必定是半连接状态
_statu = CONNECTED;
_channel.EnableRead();
if(_connected_callback){
_connected_callback(shared_from_this());
}
}
void SendInLoop(const char* data, size_t len){ //并不是实际的发送接口 只是把数据放到发送缓冲区 启动可写事件监控
if(_statu == DISCONNECTED) return;
_out_buffer.WriteAndPush(data,len);
if(_channel.IsWrite() == false){
_channel.EnableWrite();
}
}
void ReleaseInLoop(){ //这个接口才是实际的释放接口
// 1.修改连接状态 置为DISCONNECTED 2.移除连接的事件监控 3. 关闭描述符 4.如果当前定时器队列中还有定时销毁任务,则取消任务 5.调用关闭回调函数
_statu = DISCONNECTED;
_channel.ReMove();
_socket.Close();
if(_loop->HasTimer(_con_id)){ //.......................把这个删掉了 额外添加了个特化的
CancleInactiveRelease();
}
if(_close_callback){
_close_callback(shared_from_this());
}
if(_server_closed_callback){ //移除服务器内部管理的连接信息
_server_closed_callback(shared_from_this());
}
}
void ShutDownInLoop(){ //这个关闭操作并非实际的连接释放操作 需判断还有没有数据待处理
_statu == DISCONNCETING;
if(_in_buffer.ReadAbleSize()>0){
if(_message_callback){
_message_callback(shared_from_this(),&_in_buffer);
}
}
if(_out_buffer.ReadAbleSize()>0){ //要么写入数据时出错关闭,要么没数据发送直接关闭
if(_channel.IsWrite() == false){
_channel.EnableWrite();
}
}
if(_out_buffer.ReadAbleSize() == 0){
ReleaseInLoop();
}
}
void EnableInactiveReleaseInLoop(int sec){ //启动超时释放规则
//1.将判断标准 _enable_inactive_release 置为true
_enable_inactive_release = true;
//2.添加定时销毁任务 如果已经存在 刷新延迟即可
if(_loop->HasTimer(_con_id)){
return _loop->TimerRefresh(_con_id);
}
//3.如果不存在定时销毁任务,则新增
_loop->TimerAdd(_con_id,sec,std::bind(&Connection::ReleaseTimeInLoop,this));
}
void CancleInactiveReleaseInLoop(){
_enable_inactive_release = false;
if(_loop->HasTimer(_con_id))
_loop->TimerCancle(_con_id);
}
void UpgradeInLoop(const Any& context,const ConnectedCallback& conn, const MessageCallback& msg,
const ClosedCallback& closed,const AnyEventCallback& event){ //切换协议 重置上下文数据和阶段处理函数
SetContext(context);
_connected_callback = conn;
_message_callback = msg;
_close_callback = closed;
_event_callback = event;
}
public:
Connection(EventLoop* loop,uint64_t con_id,int sockfd)
:_con_id(con_id), _sockfd(sockfd) , _enable_inactive_release(false), _loop(loop),_statu(CONNECTING),_socket(sockfd),_channel(loop,_sockfd){
_channel.SetCloseCallBack(std::bind(&Connection::HandlerClose,this));
_channel.SetEventCallBack(std::bind(&Connection::HandlerEvent,this));
_channel.SetReadCallBack(std::bind(&Connection::HandlerRead,this));
_channel.SetWriteCallBack(std::bind(&Connection::HandlerWrite,this));
_channel.SetErrorCallBack(std::bind(&Connection::HandlerError,this));
}
~Connection(){
DBG_LOG("RELEASE CONNECTION: %p",this);
}
int FD(){
return _sockfd;
}
int ID(){
return _con_id;
}
bool Connected(){
return (_statu == CONNECTED);
}
ConnStatu Statu(){
return _statu ;
} //返回状态
void SetContext(const Any&context){ //设置上下文
_contex = context;
}
Any* GetContext(){ //获取上下文
return &_contex;
}
void SetConnectedCallback(const ConnectedCallback& cb){
_connected_callback = cb;
}
void SetMessageCallback(const MessageCallback& cb){
_message_callback = cb;
}
void SetClosedCallback(const ClosedCallback& cb){
_close_callback = cb;
}
void SetAnyEventCallback(const ConnectedCallback& cb){
_event_callback = cb;
}
void SetServerClosedCallback(const ClosedCallback& cb){
_server_closed_callback = cb;
}
void Established(){ //连接就绪后进行channel回调设置 启动读监控 调用_connected_callback
_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop,this));
}
void Send(const char* data, size_t len){ //发送数据到发送缓冲区,启动写事件监控
_loop->RunInLoop(std::bind(&Connection::SendInLoop,this,data,len));
}
void Shutdown(){ //关闭接口 --- 并不实际关闭, 需判断是否有数待处理
_loop->RunInLoop(std::bind(&Connection::ShutDownInLoop,this));
}
void EnableInactiveRelease(int sec){ //启动非活跃销毁,并定义多长时间 添加 定时任务
_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop,this,sec));
}
void CancleInactiveRelease(){
_loop->RunInLoop(std::bind(&Connection::CancleInactiveReleaseInLoop,this));
}
void ReleaseTimeInLoop(){
_loop->RunInLoop(std::bind(&Connection::ReleaseInLoop,this));
}
void Upgrade(const Any& context,const ConnectedCallback& conn, const MessageCallback& msg,
const ClosedCallback& closed,const AnyEventCallback& event){ //切换协议 重置上下文数据和阶段处理函数 -- 非线程安全
//防备新事件触发后 ,处理的时候切换协议还没执行,导致数据用原协议处理了
_loop->AssertInLoop();
_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop,this,context,conn,msg,closed,event));
}
};
Acceptor模块:
编写思路:
创建一个监听套接字,启动读事件监控,事件触发后获取新连接,调用新连接获取成功后的回调函数。
新连接获取成功的回调由后面的服务器模块管理。
接口设计:
class Acceptor{
private:
Socket _socket; //用于创建监听套接字
EventLoop* _loop; //对监听套接字进行事件监控
Channel _channel; //用于对监听套接字进行事件管理
using AcceptCallback = std::functon<void(int)>;
AcceptCallback _accept_callback;
private:
//监听套接字的读事件回调
public:
//构造
//析构
//设置回调函数
};
具体实现:
class Acceptor{
private:
Socket _socket; // 用于创建监听套接字
EventLoop* _loop; //用于对监听套接字进行事件监控
Channel _channel; //用于对监听套接字进行事件管理
using AcceptCallback = std::function<void(int)>;
AcceptCallback _accept_callback;
private:
//监听套接字的读事件回调处理函数 --- 获取新连接,调用_accept_callback函数进行新连接处理
void HandRead(){
int newfd = _socket.Accept();
if(newfd<0){
return;
}
if(_accept_callback) _accept_callback(newfd);
}
int CreateServer(int port){
bool ret = _socket.CreateServer(port);
assert(ret == true);
return _socket.FD();
}
public:
//不能将启动读事件监控放到构造函数中,必须在设置回调函数后,再去启动
Acceptor(EventLoop* loop,int port):_socket(CreateServer(port)),_loop(loop),
_channel(_loop,_socket.FD()){
_channel.SetReadCallBack(std::bind(&Acceptor::HandRead,this));
}
void SetAcceptCallback(const AcceptCallback& cb){
_accept_callback = cb;
}
void Listen(){
_channel.EnableRead();
}
};
TimerQueue模块:
编写思路:
如何实现定时任务的功能?
- 建小根堆。
- 采用时间轮方案。
我们这里采用的是时间轮的方案。通过多级时间轮来进行定时任务的实现。
如何延迟已经设定的定时非活跃连接销毁任务?(非活跃连接接收新数据后刷新定时销毁任务)
解决方案:类的析构函数+智能指针shared_ptr
- 封装一个定时任务对象,定时任务的执行放到析构函数中
- 使用shared_ptr管理定时任务对象,计数器为0时才释放。刷新定时任务只需要在后面继续添加该定时任务对象即可。
shared_ptr使用注意点:
-
如果多个shared_ptr都是对原始对象进行构造,计数都只会为1。因此要存储weak_ptr进行协助。
#include<iostream> #include<memory> class TimerTask { }; int main() { TimerTask* t1 = new TimerTask; { std::shared_ptr<TimerTask> pi1(t1); std::shared_ptr<TimerTask> pi2(t1); } return 0; }
如上面的代码所示,我们使用pi1和pi2管理原始对象t1,此时pi1和pi2的计数都为1,其中一个释放后会导致另一个对象管理的目标已经被释放了。而我们也不能存储shared_ptr,因为这会导致计数加1,所以要使用weak_ptr进行协助。
接口设计:
class TimerTask{
private:
uint64_t id; //唯一标识
uint32_t timeout; //超时时间
using TaskFunc = std::function<void()>; //定时任务
public:
TimerTask(); //构造
~TimerTask(); //析构
};
class TimerWheel{ //时间轮
private:
using WeakTask = std::weak_ptr<TimerTask>;
using PtrTask = std::shared_ptr<TimerTask>;
std::vector<std::vector<PtrTask>> _whell;
int _tick; //秒针
int _capacity; //表盘最大数量
std::unordered_map<uint64_t,weak_ptr<TimerTask>> _timers;
public:
//构造
//析构
//添加定时任务
//刷新定时任务
};
具体实现:
// 封装定时任务对象 通过析构函数和shared_ptr实现延迟销毁
class TimerTask
{
private:
uint64_t _id; // 标识该任务对象id
uint32_t _timeout; // 超时时间
TaskFunc _task_cb; // 要执行的定时任务
ReleaseFunc _release; // 用于删除timewheel中保存的定时任务对象信息
bool _canceled; // false 表示未被取消 true,表示取消
public:
TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb)
: _id(id), _timeout(delay), _task_cb(cb), _canceled(false)
{
}
~TimerTask()
{
// 析构的时候再执行定时任务
if (_canceled == false)
{
_task_cb();
_release();
}
}
void TimerCancle()
{
_canceled = true;
}
void SetRelease(const ReleaseFunc &re) // 设置
{
_release = re;
return;
}
uint32_t DelayTime()
{
return _timeout; // 获取延迟时间
}
};
class EventLoop;
// 时间轮
class TimeWheel
{
private:
using WeakTask = std::weak_ptr<TimerTask>;
using PtrTask = std::shared_ptr<TimerTask>;
int _tick; // 秒针,指到哪执行到哪里
int _capacity; // 表盘的最大数量,即最大的延迟时间
// 用二维数组存储时间段及该时间的定时任务
std::vector<std::vector<PtrTask>> _wheel;
// 建立定时任务id和weakptr的映射关系 涉及share_ptr的缺陷
std::unordered_map<uint64_t, WeakTask> _timers;
EventLoop *_loop;
int _timerfd; // 定时器描述 -- 可读事件回调就是读取计数器 执行定时任务
std::unique_ptr<Channel> _timer_channel;
private:
void RemoveTimer(uint64_t id)
{
auto it = _timers.find(id);
if (it != _timers.end()) // 找到该定时任务
{
_timers.erase(it);
}
return;
}
static int CreateTimerfd()
{
int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
if (timerfd < 0)
{
ERR_LOG("timerfd_create FAILED");
return -1;
}
// int timerfd_settime(int fd, int flags,const struct itimerspec *new_value,struct itimerspec *old_value);
struct itimerspec itime;
itime.it_value.tv_sec = 1;
itime.it_value.tv_nsec = 0;
itime.it_interval.tv_sec = 1;
itime.it_interval.tv_nsec = 0;
timerfd_settime(timerfd, 0, &itime, NULL);
return timerfd;
}
void ReadTimerfd()
{
uint64_t times;
int ret = read(_timerfd, ×, 8);
if (ret < 0)
{
ERR_LOG("READ TIMEFD FAILED");
abort();
}
return;
}
void OnTime()
{
ReadTimerfd();
RunTimerTask();
}
void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb) // 添加定时任务
{
PtrTask pt(new TimerTask(id, delay, cb));
pt->SetRelease(std::bind(&TimeWheel::RemoveTimer, this, id)); // 为什么要&
_timers[id] = WeakTask(pt);
int pos = (_tick + delay) % _capacity; // 循环队列一样
_wheel[pos].push_back(pt); // 插入延迟任务
}
void TimerCancleInLoop(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
{
return; // 没该定时任务,返回
}
// 找到后调用
PtrTask pt = it->second.lock(); //获取智能指针的时候,需要判断获取的智能指针是否是有管理对象的,
if (pt) pt->TimerCancle();
return;
}
void TimerRefreshInLoop(uint64_t id)
{ // 刷新/延迟定时任务
// 通过保存的定时器对象中的weak_ptr构造一个share_ptr,添加到轮子
auto it = _timers.find(id);
if (it == _timers.end())
{
return; // 没该定时任务,返回
}
PtrTask pt = it->second.lock(); // lock获取weak_ptr管理的对象对应的shared_ptr
uint32_t delay = pt->DelayTime();
int pos = (_tick + delay) % _capacity; // 循环队列一样
_wheel[pos].push_back(pt); // 插入延迟任务
}
public:
TimeWheel(EventLoop *loop)
: _capacity(60), _tick(0), _wheel(_capacity), _loop(loop), _timerfd(CreateTimerfd()), _timer_channel(new Channel(_loop, _timerfd))
{
_timer_channel->SetReadCallBack(std::bind(&TimeWheel::OnTime, this));
_timer_channel->EnableRead(); // 启动读事件监控
}
// 定时器的操作有可能在多线程中执行,得保证线程安全
// 不想加锁,就得把操作都放到一个线程中执行
void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb);
void TimerRefresh(uint64_t id);
void TimerCancle(uint64_t id);
bool HashTimer(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
{
return false; // 没该定时任务,返回
}
return true;
}
void RunTimerTask()
{ // 执行定时任务,应该每秒执行一次,相当于秒针走一步
_tick = (_tick + 1) % _capacity;
_wheel[_tick].clear(); // 清空该位置的定时任务,自动调用定时任务对象的析构
}
};
Poller模块:
编写思路:
通过epoll实现对描述符的IO事件监控。
- 添加描述符的事件监控
- 修改描述符的事件监控
- 移除描述符的事件监控
1和2可整合,存在则修改,不存在则添加
如何封装?
- 用于一个epoll的操作句柄(即文件描述符)
- 拥有一个struct epoll_event 结构数组,监控保存所有的活跃事件
- 使用hash表管理描述符与对应的事件管理Channel对象
接口设计:
class Poller{
private:
int _epfd;
struct epoll_event _evs[MAX_EPOLLEVENTS];
std::uinordered_map<int,Channel*>
private:
//1.判断要更新的事件的描述符是否存在
//2.对epoll直接操作
public:
//1.添加或更新描述符所监控的事件
//2.移除描述符的监控
//3.开始监控,获取就绪的Channel
};
具体实现:
#define MAX_EPOLLEVENTS 1024
// poller模块
class Poller
{
private:
int _epfd;
struct epoll_event _evs[MAX_EPOLLEVENTS];
std::unordered_map<int, Channel *> _channels;
private:
// 对epoll的直接操作
void UpDate(Channel *channel, int op)
{
// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int fd = channel->Fd();
struct epoll_event ep;
ep.data.fd = fd;
ep.events = channel->Event();
int ret = epoll_ctl(_epfd, op, fd, &ep);
if (ret < 0)
{
ERR_LOG("EPOLL CTL ERR");
}
return;
}
// 判断一个Channel是否添加了监控
bool HasChannel(Channel *channel)
{
auto it = _channels.find(channel->Fd());
if (it == _channels.end())
{
return false;
}
return true;
}
public:
Poller()
{
// int epoll_create(int size); the size argument is ignored, but must be greater than zero;
int ret = epoll_create(1);
if (ret < 0)
{
ERR_LOG("EPOLL CREATE ERR");
abort(); // 创建失败 终止程序
}
_epfd = ret;
}
// 1.添加或修改监控事件
void UpDateEvent(Channel *channel)
{
bool ret = HasChannel(channel);
if (ret == false)
{ // 不存在 添加
_channels.insert(std::make_pair(channel->Fd(), channel));
return UpDate(channel, EPOLL_CTL_ADD);
}
// 存在更新
return UpDate(channel, EPOLL_CTL_MOD);
}
// 2.移除监控
void ReMoveEvent(Channel *channel)
{
auto it = _channels.find(channel->Fd());
if (it != _channels.end())
{
_channels.erase(it);
}
UpDate(channel, EPOLL_CTL_DEL);
}
// 3.开始监听 ,返回活跃连接
void Poll(std::vector<Channel *> *active)
{
// int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1);
if (nfds < 0)
{
if (errno == EINTR)
{
return;
}
ERR_LOG("EPOLL WAIT ERROR:%S\n", strerror(errno));
abort();
}
for (int i = 0; i < nfds; ++i)
{
auto it = _channels.find(_evs[i].data.fd);
assert(it != _channels.end());
it->second->SetEvents(_evs[i].events);
active->push_back(it->second);
}
}
};
EventLoop模块:
编写思路:
进行事件监控以及事件处理的模块,此模块是与线程一一对应的。
如何保证一个连接的所有操作都在eventloop对应的线程中?
- 给eventloop模块中添加一个任务队列。
- 对连接的所有操作都进行一次封装,对连接的操作当作任务添加到任务队列中。
- 等所有就绪事件处理完,然后从task任务队列中一一取出进行操作执行
因此我们只需要给任务队列一把锁即可。
接口设计:
class EventLoop{
private:
std::thread::id _thread_id; //线程id
int _event_fd; //eventfd唤醒IO事件监控有可能导致的阻塞
Poller _poller; //进行所有描述符的事件监控
using Functor = std::function<void()>;
std::vector<Functor> _tasks; //任务池
std::mutex _mutex; //实现任务池操作的线程安全
private:
//执行所有的任务
public:
EventLoop();
//1. 判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列。
//2.将操作压入任务池
//3.判断当前线程是否是eventLoop对应的线程
//4.添加/修改描述符的事件监控
//5.移除描述符的监控
//6.启动
};
具体实现:
class EventLoop
{
private:
using Functor = std::function<void()>;
std::thread::id _thread_id; // 线程ID
int _event_fd; // eventfd唤醒IO事件监控可能导致的阻塞
Poller _poller; // 进行所有描述符的事件监控
std::unique_ptr<Channel> _event_channel;
std::vector<Functor> _tasks; // 任务池
std::mutex _mutex; // 实现任务池操作的线程安全
TimeWheel _timer_wheel;
public:
// 执行任务池中的所有任务
void RunAllTask()
{
std::vector<Functor> functor;
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks.swap(functor);
}
for (auto &f : functor)
{
f();
}
return;
}
static int CreateEventFd()
{
int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (efd < 0)
{
ERR_LOG("CREATE EVENTFD FAILED!!");
abort();
}
return efd;
}
void ReadEventfd()
{
uint64_t res = 0;
int ret = read(_event_fd, &res, sizeof(res));
if (ret < 0)
{ // EINTER 表示被信号打断 EAGAIN 表示无数据可读
if (errno == EINTR)
{
return;
}
ERR_LOG("READ EVENTFD FAILED");
abort();
}
return;
}
void WakeUpEventFd()
{
uint64_t val = 1;
int ret = write(_event_fd, &val, sizeof(val));
if (ret < 0)
{
if (errno == EINTR)
{
return;
}
ERR_LOG("READ EVENTFD FAILED!");
abort();
}
return;
}
public:
EventLoop()
: _thread_id(std::this_thread::get_id()),
_event_fd(CreateEventFd()), _poller(), _event_channel(new Channel(this, _event_fd)), _timer_wheel(this)
{
_event_channel->SetReadCallBack(std::bind(&EventLoop::ReadEventfd, this));
// 启动eventfd的读事件监控
_event_channel->EnableRead();
}
void RunInLoop(const Functor &cb)
{ // 当前任务在当前线程即执行,不是则压入队列
if (IsInLoop())
{
return cb();
}
return QueueInLoop(cb);
}
void QueueInLoop(const Functor &cb)
{ // 将操作压入任务池
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks.push_back(cb);
}
// 唤醒有可能以为没事情就绪,导致的epoll阻塞
// 其实就是给eventfd写一个数据,eventfd就会触发可读事件
WakeUpEventFd();
}
bool IsInLoop()
{ // 判断当前线程是否是EventLoop对应的线程
return _thread_id == std::this_thread::get_id();
}
void AssertInLoop(){
assert(_thread_id == std::this_thread::get_id());
}
void UpdateEvent(Channel *channel)
{ // 添加/修改描述符的事件监控
_poller.UpDateEvent(channel);
}
void RemoveEvent(Channel *channel)
{ // 移除监控
_poller.ReMoveEvent(channel);
}
void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb)
{
return _timer_wheel.TimerAdd(id, delay, cb);
}
void TimerRefresh(uint64_t id)
{
return _timer_wheel.TimerRefresh(id);
}
void TimerCancle(uint64_t id)
{
return _timer_wheel.TimerCancle(id);
}
bool HasTimer(uint64_t id)
{
return _timer_wheel.HashTimer(id);
}
void Start()
{ // 事件监控 --- 就绪事件处理 ---- 执行任务
// 1.事件监控
while(1){
std::vector<Channel *> actives;
_poller.Poll(&actives);
// 2.事件处理
for (auto &channel : actives)
{
channel->HandleEvent();
}
// 3.执行任务
RunAllTask();
}
}
};
TcpServer模块:
编写思路:
对所有模块的整合,通过TcpServer模块实例化对象,可以非常简单的完成一个服务器的搭建。
接口设计:
class TcpServer{
private:
int _port;
uint64_t _conn_id; //自动增长的连接Id
Acceptor _acceptor; //监听套接字的管理对象
EventLoop _baseloop; //这是主线程的EventLoop对象,负责监听事件的处理
LoopThreadPool _pool; //这是从属EventLoop线程池
int _timeout; //这是非活跃连接的统计事件 -- 多长时间无通信就是非活跃连接
bool _enable_inactive_release; //释放需要释放非活跃连接的标志
std::unordered_map<uint64_t,PtrConnection> _conns; //保存管理所有连接对应的shared_ptr对象;
using Functor = std::function<void()>; //用户设置的定时任务
//这四个回调函数是由组件使用者来设置的
using ConnectedCallback = std::function<void(const PtrConnection&)>;
using MessageCallback = std::function<void(const PtrConnection&,Buffer*)>;
using ClosedCallback = std::function<void(const PtrConnection&)>;
using AnyEventCallback = std::function<void(const PtrConnection&)>;
ConnectedCallback _connected_callback;
MessageCallback _message_callback;
ClosedCallback _close_callback;
AnyEventCallback _event_callback;
public:
//构造
//析构
//设置从属线程数量
//设置回调
//定时任务添加
//启动服务器
具体实现:
class TcpServer{
private:
int _port;
uint64_t _conn_id; //自动增长的连接Id
Acceptor _acceptor; //监听套接字的管理对象
EventLoop _baseloop; //这是主线程的EventLoop对象,负责监听事件的处理
LoopThreadPool _pool; //这是从属EventLoop线程池
int _timeout; //这是非活跃连接的统计事件 -- 多长时间无通信就是非活跃连接
bool _enable_inactive_release; //释放需要释放非活跃连接的标志
std::unordered_map<uint64_t,PtrConnection> _conns; //保存管理所有连接对应的shared_ptr对象;
using Functor = std::function<void()>; //用户设置的定时任务
//这四个回调函数是由组件使用者来设置的
using ConnectedCallback = std::function<void(const PtrConnection&)>;
using MessageCallback = std::function<void(const PtrConnection&,Buffer*)>;
using ClosedCallback = std::function<void(const PtrConnection&)>;
using AnyEventCallback = std::function<void(const PtrConnection&)>;
ConnectedCallback _connected_callback;
MessageCallback _message_callback;
ClosedCallback _close_callback;
AnyEventCallback _event_callback;
private:
void NewConnection(int fd){ //给新连接构造一个Connection管理
++_conn_id;
PtrConnection conn(new Connection(_pool.NextLoop(),_conn_id,fd));
conn->SetMessageCallback(_message_callback);
conn->SetClosedCallback(_close_callback);
conn->SetConnectedCallback(_connected_callback);
conn->SetAnyEventCallback(_event_callback);
conn->SetServerClosedCallback(std::bind(&TcpServer::RemoveConnection,this,std::placeholders::_1));
if(_enable_inactive_release) conn->EnableInactiveRelease(_timeout); //启动超时销毁
conn->Established(); //就绪初始化
_conns.insert(std::make_pair(_conn_id,conn));
}
void RemoveConnectionInLoop(const PtrConnection& conn){
int id = conn->ID();
auto it =_conns.find(id);
if(it != _conns.end()){
_conns.erase(it);
}
}
void RunAfterInLoop(const Functor& task,int delay){
_conn_id++;
_baseloop.TimerAdd(_conn_id,delay,task);
}
public:
TcpServer(int port)
:_port(port),
_conn_id(0),
_enable_inactive_release(false)
,_acceptor(&_baseloop,_port)
,_pool(&_baseloop)
{
_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection,this,std::placeholders::_1));
_acceptor.Listen(); //将监听套接字挂到baseloop上开始监控
}
void SetThreadCount(int count){
return _pool.SetThreadCount(count);
}
void SetConnectedCallback(const ConnectedCallback& cb){
_connected_callback = cb;
}
void SetMessageCallback(const MessageCallback& cb){
_message_callback = cb;
}
void SetClosedCallback(const ClosedCallback& cb){
_close_callback = cb;
}
void SetAnyEventCallback(const ConnectedCallback& cb){
_event_callback = cb;
}
void EnableInactiveRelease(int timeout){
_timeout = timeout;
_enable_inactive_release = true;
}
void RunAfter(const Functor& task,int delay){
_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop,this,task,delay));
}
void RemoveConnection(const PtrConnection& conn){ //从管理Connection的_conns中移除连接信息
_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop,this,conn));
}
void Start(){
_pool.Create(); //创建线程池中的从属线程
return _baseloop.Start();
}
};
协议模块
Util模块:
具体实现:
class Util{
public:
//字符串分割函数 将src字符串按照sep字符进行分割 得到各个字符串放到arry中,最终返回子串的数量
static size_t Split(const std::string &src,const std::string& sep,std::vector<std::string>* arry){
size_t offset = 0;
while(offset < src.size()){
size_t pos = src.find(sep,offset); //在src字符串偏移量offset处 开始向后查找seo字符/子串,返回查找的位置
if(pos == std::string::npos){ //未找到特定的字符
//将剩余的部分当作一个子串 放入arry中
arry->push_back(src.substr(offset));
return arry->size();
}
if(pos == offset) {
offset = pos + sep.size();
continue; //当前子串是空的
}
arry->push_back(src.substr(offset,pos - offset));
offset = pos + sep.size();
}
return arry->size();
}
//读取文件的所有内容,将读取的内容放入一个Buffer中
static bool ReadFile(const std::string &filename,std::string* buf){
std::ifstream ifs(filename,std::ios::binary);
if(ifs.is_open() == false){
ERR_LOG("open failed!!",filename.c_str());
return false;
}
size_t fsize = 0;
ifs.seekg(0,ifs.end); //跳到读写位置的末尾
fsize = ifs.tellg(); //获取读写位置相对于起始位置的偏移量 末尾位置偏移量即文件大小
ifs.seekg(0,ifs.beg); //跳到起始位置
buf->resize(fsize);
ifs.read(&(*buf)[0],fsize);
if(ifs.good() == false){
ERR_LOG("READ %s FILE FAILED!",filename.c_str());
ifs.close();
return false;
}
ifs.close();
return true;
}
//向文件写入数据
static bool WriteFile(const std::string &filename,const std::string& buf){
std::ofstream ofs(filename,std::ios::binary | std::ios::trunc);
if(ofs.is_open() == false){
ERR_LOG("open failed!!",filename.c_str());
return false;
}
ofs.write(buf.c_str(),buf.size());
if(ofs.good() == false){
ERR_LOG("WRITE %s FILE FAILED!",filename.c_str());
ofs.close();
return false;
}
ofs.close();
return true;
}
//URL编码 避免URL中资源路径与查询字符串中的特殊字符与Http请求中特殊字符产生歧义
//编码格式:将特殊字符的ascii值转换为两个16进制字符 前缀% c++ -> c%2B%2B
//不编码的特殊字符 RFC3986文档规定 . - _ ~ 字母,数字属于绝对不编码字符
//RFC3986文档规定 编码格式 %HH
//W3C文档中规定 查询字符串中的空格需要被编码成 + 解码则是+转空格
static std::string UrlEncode(const std::string url,bool convert_space_to_plus){
std::string res;
for(auto c : url){
if(c == '.' || c == '-' || c=='_' || c == '~' ||isalnum(c)){
res +=c;
continue;
}
if(c == ' ' && convert_space_to_plus){
res +='+';
continue;
}
//剩下的字符都需要编码成%HH格式
char tmp[4] = {0};
snprintf(tmp,4,"%%%02X",c);
res += tmp;
}
return res;
}
static int HEXTOI(char c){ //16进制转整型
if(c >='0' && c <='9'){
return c-'0';
}
else if(c >= 'a' && c <='z'){
return c - 'a' + 10;
}
else if(c>='A' && c<='Z'){
return c - 'A' + 10;
}
return -1;
}
//URL解码
static std::string UrlDecode(const std::string url,bool convert_plus_to_space){
//遇到了%,则将紧随其后的2个字符,转换成数字,第一个数字左移4位,然后加上第二个数字 + %2b = 2<<4 +11;
std::string res;
for(int i = 0;i<url.size();++i){
if(url[i] == '+' && convert_plus_to_space){
res +=' ';
}
if(url[i] == '%' && (i+2) < url.size()){
char v1 = HEXTOI(url[i+1]);
char v2 = HEXTOI(url[i+2]);
char v = (v1<<4) + v2;
res +=v;
i+=2;
continue;
}
res += url[i];
}
return res;
}
//响应状态码的描述信息获取
static std::string StatueDesc(int statu){
std::unordered_map<int,std::string> _statu_msg = {
{100, "Continue"},
{101, "Switching Protocol"},
{102, "Processing"},
{103, "Early Hints"},
{200, "OK"},
{201, "Created"},
{202, "Accepted"},
{203, "Non-Authoritative Information"},
{204, "No Content"},
{205, "Reset Content"},
{206, "Partial Content"},
{207, "Multi-Status"},
{208, "Already Reported"},
{226, "IM Used"},
{300, "Multiple Choice"},
{301, "Moved Permanently"},
{302, "Found"},
{303, "See Other"},
{304, "Not Modified"},
{305, "Use Proxy"},
{306, "unused"},
{307, "Temporary Redirect"},
{308, "Permanent Redirect"},
{400, "Bad Request"},
{401, "Unauthorized"},
{402, "Payment Required"},
{403, "Forbidden"},
{404, "Not Found"},
{405, "Method Not Allowed"},
{406, "Not Acceptable"},
{407, "Proxy Authentication Required"},
{408, "Request Timeout"},
{409, "Conflict"},
{410, "Gone"},
{411, "Length Required"},
{412, "Precondition Failed"},
{413, "Payload Too Large"},
{414, "URI Too Long"},
{415, "Unsupported Media Type"},
{416, "Range Not Satisfiable"},
{417, "Expectation Failed"},
{418, "I'm a teapot"},
{421, "Misdirected Request"},
{422, "Unprocessable Entity"},
{423, "Locked"},
{424, "Failed Dependency"},
{425, "Too Early"},
{426, "Upgrade Required"},
{428, "Precondition Required"},
{429, "Too Many Requests"},
{431, "Request Header Fields Too Large"},
{451, "Unavailable For Legal Reasons"},
{501, "Not Implemented"},
{502, "Bad Gateway"},
{503, "Service Unavailable"},
{504, "Gateway Timeout"},
{505, "HTTP Version Not Supported"},
{506, "Variant Also Negotiates"},
{507, "Insufficient Storage"},
{508, "Loop Detected"},
{510, "Not Extended"},
{511, "Network Authentication Required"},
};
auto it = _statu_msg.find(statu);
if(it != _statu_msg.end()){
return it->second;
}
else "UNKNOW STATU";
}
//根据文件后缀名获取文件mime
static std::string FileNameExt(const std::string& filename){
std::unordered_map<std::string,std::string>_mime_msg = {
{".aac", "audio/aac"},
{".abw", "application/x-abiword"},
{".arc", "application/x-freearc"},
{".avi", "video/x-msvideo"},
{".azw", "application/vnd.amazon.ebook"},
{".bin", "application/octet-stream"},
{".bmp", "image/bmp"},
{".bz", "application/x-bzip"},
{".bz2", "application/x-bzip2"},
{".csh", "application/x-csh"},
{".css", "text/css"},
{".csv", "text/csv"},
{".doc", "application/msword"},
{".docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"},
{".eot", "application/vnd.ms-fontobject"},
{".epub", "application/epub+zip"},
{".gif", "image/gif"},
{".htm", "text/html"},
{".html", "text/html"},
{".ico", "image/vnd.microsoft.icon"},
{".ics", "text/calendar"},
{".jar", "application/java-archive"},
{".jpeg", "image/jpeg"},
{".jpg", "image/jpeg"},
{".js", "text/javascript"},
{".json", "application/json"},
{".jsonld", "application/ld+json"},
{".mid", "audio/midi"},
{".midi", "audio/x-midi"},
{".mjs", "text/javascript"},
{".mp3", "audio/mpeg"},
{".mpeg", "video/mpeg"},
{".mpkg", "application/vnd.apple.installer+xml"},
{".odp", "application/vnd.oasis.opendocument.presentation"},
{".ods", "application/vnd.oasis.opendocument.spreadsheet"},
{".odt", "application/vnd.oasis.opendocument.text"},
{".oga", "audio/ogg"},
{".ogv", "video/ogg"},
{".ogx", "application/ogg"},
{".otf", "font/otf"},
{".png", "image/png"},
{".pdf", "application/pdf"},
{".ppt", "application/vnd.ms-powerpoint"},
{".pptx", "application/vnd.openxmlformats-officedocument.presentationml.presentation"},
{".rar", "application/x-rar-compressed"},
{".rtf", "application/rtf"},
{".sh", "application/x-sh"},
{".svg", "image/svg+xml"},
{".swf", "application/x-shockwave-flash"},
{".tar", "application/x-tar"},
{".tif", "image/tiff"},
{".tiff", "image/tiff"},
{".ttf", "font/ttf"},
{".txt", "text/plain"},
{".vsd", "application/vnd.visio"},
{".wav", "audio/wav"},
{".weba", "audio/webm"},
{".webm", "video/webm"},
{".webp", "image/webp"},
{".woff", "font/woff"},
{".woff2", "font/woff2"},
{".xhtml", "application/xhtml+xml"},
{".xls", "application/vnd.ms-excel"},
{".xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"},
{".xml", "application/xml"},
{".xul", "application/vnd.mozilla.xul+xml"},
{".zip", "application/zip"},
{".3gp", "video/3gpp"},
{".3g2", "video/3gpp2"},
{".7z", "application/x-7z-compressed"},
};
size_t pos = filename.find_last_of('.');
if(pos == std::string::npos){
return "applaction/octet-stream";
}
std::string ext = filename.substr(pos); //获取扩展名
auto it = _mime_msg.find(ext);
if(it == _mime_msg.end()){
return "applaction/octet-stream";
}
return it->second;
}
//判断是否是目录
static bool IsDirectory(const std::string& filename){
struct stat st;
int ret = stat(filename.c_str(),&st);
if(ret<0){
return false;
}
return S_ISDIR(st.st_mode);
}
//判断是否是普通文件
static bool IsRegular(const std::string& filename){
struct stat st;
int ret = stat(filename.c_str(),&st);
if(ret<0){
return false;
}
return S_ISREG(st.st_mode);
}
//http请求的资源路径是否有效
// /index.html --- / 相对根目录
//客户端只能请求相对跟目录的资源,其他地方的资源都不理会 遇到..对层数减一
static bool ValiedPath(const std::string& path){ //计算目录深度
std::vector<std::string> subdir;
Split(path,"/",&subdir);
int level = 0;
for(auto &dir :subdir){
if(dir == ".."){
level--;
if(level <0){
return false;
}
continue;
}
level++;
}
return true;
}
};
HttpRequest模块:
具体实现:
class HttpRequest{
public:
std::string _method; //请求方法
std::string _path; //资源路径
std::string _version; //协议版本
std::string _body; // 请求正文
std::smatch _matches; //资源路径正则提取数据
std::unordered_map<std::string,std::string> _headrs; //头部字段
std::unordered_map<std::string,std::string> _params; //查询字符串
public:
void ReSet(){
_method.clear();
_path.clear();
_version.clear();
_body.clear();
std::smatch match;
_matches.swap(match);
_headrs.clear();
_params.clear();
}
//插入头部字段
void SetHeader(const std::string& key ,const std::string& val){
_headrs.insert(std::make_pair(key,val));
}
//判断是否存在指定头部字段
bool HasHeader(const std::string &key) const {
auto it = _headrs.find(key);
if (it == _headrs.end()) {
return false;
}
return true;
}
//获取指定头部字段的值
std::string GetHeader(const std::string &key) const {
auto it = _headrs.find(key);
if (it == _headrs.end()) {
return "";
}
return it->second;
}
//插入查询字符串
void SetParam(std::string& key,std::string& val){
_params.insert(std::make_pair(key,val));
}
//判断是否存在查询字符串
bool HasParam(const std::string& key){
auto it = _params.find(key);
if(it == _params.end()){
return false;
}
return true;
}
//获取指定查询字符串的值
std::string GetParam(const std::string& key){
auto it = _params.find(key);
if(it == _params.end()){
return "";
}
return it->second;
}
//获取正文长度
size_t ContentLength(){
//Content-Length
bool ret = HasHeader("Content-Length");
if(ret == false){
return 0;
}
std::string clen = GetHeader("Content-Length");
return std::stol(clen);
}
//判断是否是短链接
bool Close() const {
// 没有Connection字段,或者有Connection但是值是close,则都是短链接,否则就是长连接
if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive") {
return false;
}
return true;
}
};
HttpResponse模块:
具体实现:
class HttpResponse{
public:
int _statu;
bool _redirect_flag;
std::string _body;
std::string _redirect_url;
std::unordered_map<std::string,std::string> _headers;
public:
HttpResponse():_redirect_flag(false),_statu(200) {}
HttpResponse(int statu):_redirect_flag(false),_statu(statu) {}
void Rest(){
_statu = 200;
_redirect_flag = false;
_body.clear();
_redirect_url.clear();
_headers.clear();
}
//插入头部字段
//插入头部字段
void SetHeader(const std::string& key ,const std::string& val){
_headers.insert(std::make_pair(key,val));
}
//判断是否存在头部字段
bool HasHeader(const std::string& key){
auto it = _headers.find(key);
if(it == _headers.end()){
return false;
}
return true;
}
//获取指定头部字符串的值
std::string GetHeader(const std::string& key){
auto it = _headers.find(key);
if(it == _headers.end()){
return "";
}
return it->second;
}
void SetContent(std::string body,const std::string& type){
_body = body;
SetHeader("Content-Type",type);
}
void SerRedirect(std::string& url,int statu = 302){
_statu = statu;
_redirect_flag = true;
_redirect_url = url;
}
//判断是否是短链接
bool Close(){
//没有Connection字段 或者字段是close 都是短连接
if(HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive"){
return true;
}
return false;
}
};
请求接收上下文模块:
具体实现:
enum HttpRecvStatu{
RECV_HTTP_ERROR,
RECV_HTTP_LINE,
RECV_HTTP_HEAD,
RECV_HTTP_BODY,
RECV_HTTP_OVER
};
#define MAX_LINE 8192
class HttpContext{
private:
int _resp_statu; //响应状态码
HttpRecvStatu _recv_statu;
HttpRequest _request;
private:
bool ParseHttpLine(const std::string& line){
std::smatch matches;
//请求方法匹配 GET HEAD POST PUT DELETE 。。。。
std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r|\r\n)?") ;
bool ret = std::regex_match(line,matches,e);
if(ret == false){
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400; //BAD REQUEST
return false;
}
_request._method = matches[1]; //请求方法的获取
_request._path = Util::UrlDecode(matches[2],false); //资源路径的获取 需要解码操作 不需要+转空格
_request._version = matches[4]; //协议版本的获取
std::vector<std::string> query_string_arry;
std::string query_string = matches[3]; //查询字符串的处理
Util::Split(query_string,"&",&query_string_arry); // key=val&key=val 先以&分割
for(auto &str : query_string_arry){
size_t pos = str.find("=");
if(pos == std::string::npos){
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400; //BAD REQUEST
return false;
}
std::string key = Util::UrlDecode(str.substr(0,pos),true); //以等会分割,并且解码 需要=转空格
std::string val = Util::UrlDecode(str.substr(pos+1),true);
_request.SetParam(key,val);
}
}
bool RecvHttpLine(Buffer* buf){
if(_recv_statu != RECV_HTTP_LINE) return false;
//获取一行数据 可能 缓冲区数据不足一行 获取的一行数据超大
std::string line = buf->GetLineAndPop();
if(line.size() == 0){
//缓冲区数据不足一行,则需要判断缓冲区数据可读长度,如果很长都不足一行 有问题
if(buf->ReadAbleSize() > MAX_LINE){
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414; //URL TOO LONG
return false;
}
//等新数据到来
return true;
}
bool ret = ParseHttpLine(line);
if(ret == false){
return false;
}
buf->MoveReadOffset(line.size());
_recv_statu = RECV_HTTP_HEAD; //首行处理完毕 进入头部获取阶段
return true;
}
bool RecvHttpHead(Buffer* buf){ //一行行取出数据 遇到空行为止 key: /r/n.../r/n
if(_recv_statu!= RECV_HTTP_HEAD){
return false;
}
while(1){
//获取一行数据 可能 缓冲区数据不足一行 获取的一行数据超大
std::string line = buf->GetLineAndPop();
if(line.size() == 0){
//缓冲区数据不足一行,则需要判断缓冲区数据可读长度,如果很长都不足一行 有问题
if(buf->ReadAbleSize() > MAX_LINE){
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414; //URL TOO LONG
return false;
}
//等新数据到来
return true;
}
if(line == "\n" || line == "\r\n"){
break;
}
bool ret = ParseHttpHead(line);
if(ret == false){
return false;
}
}
_recv_statu = RECV_HTTP_BODY; //头部获取结束 进入正文获取阶段
return true;
}
bool ParseHttpHead(const std::string& line){
size_t pos = line.find(": ");
if(pos == std::string::npos){
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400; //URL TOO LONG
return false;
}
std::string key =line.substr(0,pos);
std::string val = line.substr(pos+2);
_request.SetHeader(key,val);
return true;
}
bool RecvHttpBody(Buffer* buf){
if(_recv_statu != RECV_HTTP_BODY) return false;
//1。获取正文长度
size_t content_length = _request.ContentLength();
if(content_length == 0){
//无正文,接收完毕
_recv_statu = RECV_HTTP_OVER;
return true;
}
//2.当前已经接收了多少正文 取决于_request.body
size_t real_len = content_length - _request._body.size(); //实际还需接收的正文长度
//3.接收正文放到body中,但是也要考虑当前缓冲区中的数据是否是全部的正文。
//3.1 缓冲区中数据,包含了当前请求的所有正文,则取出所需的数据
if(buf->ReadAbleSize() >= real_len){
_request._body.append(buf->ReadPosition(),real_len);
buf->MoveReadOffset(real_len);
_recv_statu = RECV_HTTP_OVER;
return true;
}
//3.2 缓冲区中数据,无法满足当前正文的需要,数据不足,取出数据,然后等待新数据到来
_request._body.append(buf->ReadPosition(),buf->ReadAbleSize());
buf->MoveReadOffset(buf->ReadAbleSize());
return true;
}
public:
HttpContext():_resp_statu(200),_recv_statu(RECV_HTTP_LINE){};
int RespStatu(){
return _resp_statu;
}
void ReSet(){
_resp_statu = 200;
_recv_statu = RECV_HTTP_LINE;
_request.ReSet();
}
HttpRecvStatu RecvStatu(){
return _recv_statu;
}
HttpRequest& Request(){
return _request;
}
void RecvHttpRequest(Buffer* buf){
//不同状态做不同的事情,这里不要break,
switch(_recv_statu){
case RECV_HTTP_LINE: RecvHttpLine(buf);
case RECV_HTTP_HEAD: RecvHttpHead(buf);
case RECV_HTTP_BODY: RecvHttpBody(buf);
}
return;
}
};
HttpServer模块
具体实现:
#define DEFALT_TIMEOUT 30
class HttpServer {
private:
using Handler = std::function<void(const HttpRequest &, HttpResponse *)>;
using Handlers = std::vector<std::pair<std::regex, Handler>>;
Handlers _get_route;
Handlers _post_route;
Handlers _put_route;
Handlers _delete_route;
std::string _basedir; //静态资源根目录
TcpServer _server;
private:
void ErrorHandler(const HttpRequest &req, HttpResponse *rsp) {
//1. 组织一个错误展示页面
std::string body;
body += "<html>";
body += "<head>";
body += "<meta http-equiv='Content-Type' content='text/html;charset=utf-8'>";
body += "</head>";
body += "<body>";
body += "<h1>";
body += std::to_string(rsp->_statu);
body += " ";
body += Util::StatueDesc(rsp->_statu);
body += "</h1>";
body += "</body>";
body += "</html>";
//2. 将页面数据,当作响应正文,放入rsp中
rsp->SetContent(body, "text/html");
}
//将HttpResponse中的要素按照http协议格式进行组织,发送
void WriteReponse(const PtrConnection &conn, const HttpRequest &req, HttpResponse &rsp) {
//1. 先完善头部字段
if (req.Close() == true) {
rsp.SetHeader("Connection", "close");
}else {
rsp.SetHeader("Connection", "keep-alive");
}
if (rsp._body.empty() == false && rsp.HasHeader("Content-Length") == false) {
rsp.SetHeader("Content-Length", std::to_string(rsp._body.size()));
}
if (rsp._body.empty() == false && rsp.HasHeader("Content-Type") == false) {
rsp.SetHeader("Content-Type", "application/octet-stream");
}
if (rsp._redirect_flag == true) {
rsp.SetHeader("Location", rsp._redirect_url);
}
//2. 将rsp中的要素,按照http协议格式进行组织
std::stringstream rsp_str;
rsp_str << req._version << " " << std::to_string(rsp._statu) << " " << Util::StatueDesc(rsp._statu) << "\r\n";
for (auto &head : rsp._headers) {
rsp_str << head.first << ": " << head.second << "\r\n";
}
rsp_str << "\r\n";
rsp_str << rsp._body;
//3. 发送数据
conn->Send(rsp_str.str().c_str(), rsp_str.str().size());
}
bool IsFileHandler(const HttpRequest &req) {
// 1. 必须设置了静态资源根目录
if (_basedir.empty()) {
return false;
}
// 2. 请求方法,必须是GET / HEAD请求方法
if (req._method != "GET" && req._method != "HEAD") {
return false;
}
// 3. 请求的资源路径必须是一个合法路径
if (Util::ValiedPath(req._path) == false) {
return false;
}
// 4. 请求的资源必须存在,且是一个普通文件
// 有一种请求比较特殊 -- 目录:/, /image/, 这种情况给后边默认追加一个 index.html
// index.html /image/a.png
// 不要忘了前缀的相对根目录,也就是将请求路径转换为实际存在的路径 /image/a.png -> ./wwwroot/image/a.png
std::string req_path = _basedir + req._path;//为了避免直接修改请求的资源路径,因此定义一个临时对象
if (req._path.back() == '/') {
req_path += "index.html";
}
if (Util::IsRegular(req_path) == false) {
return false;
}
return true;
}
//静态资源的请求处理 --- 将静态资源文件的数据读取出来,放到rsp的_body中, 并设置mime
void FileHandler(const HttpRequest &req, HttpResponse *rsp) {
std::string req_path = _basedir + req._path;
if (req._path.back() == '/') {
req_path += "index.html";
}
bool ret = Util::ReadFile(req_path, &rsp->_body);
if (ret == false) {
return;
}
std::string mime = Util::FileNameExt(req_path);
rsp->SetHeader("Content-Type", mime);
return;
}
//功能性请求的分类处理
void Dispatcher(HttpRequest &req, HttpResponse *rsp, Handlers &handlers) {
//在对应请求方法的路由表中,查找是否含有对应资源请求的处理函数,有则调用,没有则发挥404
//思想:路由表存储的时键值对 -- 正则表达式 & 处理函数
//使用正则表达式,对请求的资源路径进行正则匹配,匹配成功就使用对应函数进行处理
// /numbers/(\d+) /numbers/12345
for (auto &handler : handlers) {
const std::regex &re = handler.first;
const Handler &functor = handler.second;
bool ret = std::regex_match(req._path, req._matches, re);
if (ret == false) {
continue;
}
return functor(req, rsp);//传入请求信息,和空的rsp,执行处理函数
}
rsp->_statu = 404;
}
void Route(HttpRequest &req, HttpResponse *rsp) {
//1. 对请求进行分辨,是一个静态资源请求,还是一个功能性请求
// 静态资源请求,则进行静态资源的处理
// 功能性请求,则需要通过几个请求路由表来确定是否有处理函数
// 既不是静态资源请求,也没有设置对应的功能性请求处理函数,就返回405
if (IsFileHandler(req) == true) {
//是一个静态资源请求, 则进行静态资源请求的处理
return FileHandler(req, rsp);
}
if (req._method == "GET" || req._method == "HEAD") {
return Dispatcher(req, rsp, _get_route);
}else if (req._method == "POST") {
return Dispatcher(req, rsp, _post_route);
}else if (req._method == "PUT") {
return Dispatcher(req, rsp, _put_route);
}else if (req._method == "DELETE") {
return Dispatcher(req, rsp, _delete_route);
}
rsp->_statu = 405;// Method Not Allowed
return ;
}
//设置上下文
void OnConnected(const PtrConnection &conn) {
conn->SetContext(HttpContext());
DBG_LOG("NEW CONNECTION %p", conn.get());
}
//缓冲区数据解析+处理
void OnMessage(const PtrConnection &conn, Buffer *buffer) {
while(buffer->ReadAbleSize() > 0){
//1. 获取上下文
HttpContext *context = conn->GetContext()->get<HttpContext>();
//2. 通过上下文对缓冲区数据进行解析,得到HttpRequest对象
// 1. 如果缓冲区的数据解析出错,就直接回复出错响应
// 2. 如果解析正常,且请求已经获取完毕,才开始去进行处理
context->RecvHttpRequest(buffer);
HttpRequest &req = context->Request();
HttpResponse rsp(context->RespStatu());
if (context->RespStatu() >= 400) {
//进行错误响应,关闭连接
ErrorHandler(req, &rsp);//填充一个错误显示页面数据到rsp中
WriteReponse(conn, req, rsp);//组织响应发送给客户端
context->ReSet();
buffer->MoveReadOffset(buffer->ReadAbleSize());//出错了就把缓冲区数据清空
conn->Shutdown();//关闭连接
return;
}
if (context->RecvStatu() != RECV_HTTP_OVER) {
//当前请求还没有接收完整,则退出,等新数据到来再重新继续处理
return;
}
//3. 请求路由 + 业务处理
Route(req, &rsp);
//4. 对HttpResponse进行组织发送
WriteReponse(conn, req, rsp);
//5. 重置上下文
context->ReSet();
//6. 根据长短连接判断是否关闭连接或者继续处理
if (rsp.Close() == true) conn->Shutdown();//短链接则直接关闭
}
return;
}
public:
HttpServer(int port, int timeout = DEFALT_TIMEOUT):_server(port) {
_server.EnableInactiveRelease(timeout);
_server.SetConnectedCallback(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1));
_server.SetMessageCallback(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));
}
void SetBaseDir(const std::string &path) {
assert(Util::IsDirectory(path) == true);
_basedir = path;
}
/*设置/添加,请求(请求的正则表达)与处理函数的映射关系*/
void Get(const std::string &pattern, const Handler &handler) {
_get_route.push_back(std::make_pair(std::regex(pattern), handler));
}
void Post(const std::string &pattern, const Handler &handler) {
_post_route.push_back(std::make_pair(std::regex(pattern), handler));
}
void Put(const std::string &pattern, const Handler &handler) {
_put_route.push_back(std::make_pair(std::regex(pattern), handler));
}
void Delete(const std::string &pattern, const Handler &handler) {
_delete_route.push_back(std::make_pair(std::regex(pattern), handler));
}
void SetThreadCount(int count) {
_server.SetThreadCount(count);
}
void Listen() {
_server.Start();
}
};
性能测试
采⽤webbench进⾏服务器性能测试。
webbench测试原理是,创建指定数量的进程,在进程中创建客户端向服务器发送请求,收到响应后关闭连接,开始下一个连接的建立。
性能测试的两个重点衡量标准:
- 并发量:可以同时处理多少客户端的请求而不会出现连接失败
- QPS:每秒钟处理的包的数量
测试环境:
服务器环境:2核2G带宽3M的云服务器,服务器程序采用1主3从reactor模式
webbench客户端环境:同一云服务器
使用webbench分别以500,5000,10000并发量向服务器发送请求,进行了一分钟测试。
测试结果
测试1:500个客⼾端连接的情况下测试结果:
[sola@hcss-ecs-e2f8 WebBench-master]$ ./webbench -c 500 -t 60 http://127.0.0.1:8088/
Webbench - Simple Web Benchmark 1.5
Copyright (c) Radim Kolar 1997-2004, GPL Open Source Software.
Request:
GET / HTTP/1.0
User-Agent: WebBench 1.5
Host: 127.0.0.1
Runing info: 500 clients, running 60 sec.
Speed=209176 pages/min, 1284062 bytes/sec.
Requests: 209176 susceed, 0 failed.
测试2:5000个客⼾端连接的情况下测试结果:
[sola@hcss-ecs-e2f8 WebBench-master]$ ./webbench -c 5000 -t 60 http://127.0.0.1:8088/
Webbench - Simple Web Benchmark 1.5
Copyright (c) Radim Kolar 1997-2004, GPL Open Source Software.
Request:
GET / HTTP/1.0
User-Agent: WebBench 1.5
Host: 127.0.0.1
Runing info: 5000 clients, running 60 sec.
Speed=200718 pages/min, 1232855 bytes/sec.
Requests: 200718 susceed, 0 failed.
测试3:10000个客⼾端连接的情况下测试结果:
[sola@hcss-ecs-e2f8 WebBench-master]$ ./webbench -c 10000 -t 60 http://127.0.0.1:8088/
Webbench - Simple Web Benchmark 1.5
Copyright (c) Radim Kolar 1997-2004, GPL Open Source Software.
Request:
GET / HTTP/1.0
User-Agent: WebBench 1.5
Host: 127.0.0.1
Runing info: 10000 clients, running 60 sec.
Speed=197783 pages/min, 1215718 bytes/sec.
Requests: 197783 susceed, 0 failed.
以上测试中,使⽤浏览器访问服务器,均能流畅获取请求的⻚⾯。但是根据测试结果能够看出,虽然并发量⼀直在提⾼,但是总的请求服务器的数量并没有增加,反⽽有所降低,侧⾯反馈了处理所耗时间更多了,基本上可以根据19w/min左右的请求量计算出10000并发量时服务器的极限了.
⽬前受限于设备环境配置,尚未进⾏更多并发量的测试.