实现Reactor反应堆模型
- 1 前言
- 2 框架搭建
- 3 准备工作
- 4 Reactor类的设计
- 5 Connection连接接口
- 6 回调方法
1 前言
到目前为止,我学习了计算机网络,了解了网络传输的过程,理解网络协议栈的层与层之间的关系。实践了使用TCP进行的网络编程,也了解了协议的编写,实际了http协议下的通信过程。
最近学习了五种IO模型,可以通过多路转接EPOLL提高读取效率。
那么现在是否可以将多路转接与网络结合,编写一个高效处理网络请求的反应堆模型Reactor。今天我们搭建基础的结构。
2 框架搭建
我们想要搭建的是这样的结构:
- 最底层是Reactor:负责事件派发,管理connection套接字连接。可以添加监听套接字与普通套接字,其中都有对应的回调方法。可以通过套接字类型赋予连接对应的回调方法。通过多路转接IO获取就绪事件,找到对应connection执行事件。
- Connection连接:管理文件描述符的连接对象,内部有这个文件描述符的输入输出缓冲区,回调函数,客户端信息,就绪事件集。等待Reactor调用回调方法。
- Listener监听:这是专门管理监听套接字的对象,里面有对于监听套接字的方法,可以获取新连接。作为监听套接字connection的回调方法
- HandlerConnection普通套接字 :这是针对普通套接字的对象,里面有对于普通套接字事件就绪的处理方法类。
最底层的就是这三层结构。下面我们来实现这三层结构。
3 准备工作
在实现三层结构之前,我们先对多路转接IO进行封装,让代码尽可能解耦:
对于多路转接,我们设计一个基类,作为上层调用的统一接口。然后继承出子类Epoll poll select
,在子类中分别实现对应的方法。
这里只提供了Epoll的封装:
- 构造函数:构造时创建EPOLL模型,获得EPOLLfd。
- AddEvent:添加事件,调用epoll_ctl_add方法即可。
- Wait:获取底层就绪事件,直接使用epoll_wait即可
#pragma once
#include <iostream>
#include <stdlib.h>
#include <sys/epoll.h>
#include "Log.hpp"
#include "Comm.hpp"
using namespace log_ns;
// 多路复用基类
class Mutliplex
{
public:
Mutliplex(/* args */)
{
}
virtual bool AddEvent(int fd, uint32_t events) = 0;
virtual int Wait(struct epoll_event revs[], int num, int timeout) = 0;
~Mutliplex()
{
}
};
// epoll poll select基类
class Epoller : public Mutliplex
{
private:
static const int size = 128;
public:
Epoller()
{
_epollfd = ::epoll_create(size);
if (_epollfd < 0)
{
LOG(ERROR, "epoll create failed!\n");
exit(EPOLL_CREATE);
}
}
std::string EventToString(uint32_t revents)
{
std::string ret;
if (revents & EPOLLIN)
ret += "EPOLLIN";
if (revents & EPOLLOUT)
ret += "| EPOLLOUT";
return ret;
}
bool AddEvent(int fd, uint32_t events)
{
struct epoll_event ev;
ev.data.fd = fd;
ev.events = events;
int n = ::epoll_ctl(_epollfd, EPOLL_CTL_ADD, fd, &ev);
if (n < 0)
{
LOG(ERROR, "epoll_ctl add failed , errno:%d", errno);
return -1;
}
LOG(INFO, "epoll_ctl add fd:%d , events:%s\n", fd, EventToString(events).c_str());
return true;
}
int Wait(struct epoll_event revs[], int num, int timeout)
{
return ::epoll_wait(_epollfd, revs, num, timeout);
}
~Epoller()
{
}
private:
int _epollfd;
};
4 Reactor类的设计
之前的TcpServer等服务端都要在内部封装_listensock。如果封装了监听套接字那么代码结构就定型了,就必须要有对监听套接字的处理。而这里我们想将Reactor设计一个管理connection连接的类,不需要针对监听套接字进行特殊处理
成员变量:
- 通过fd映射
Connection*
对象的哈希表_conn
- 判断是否启动
bool isrunning
- 构建一个Multipex对象 , 构造时建立epoll指针,负责处理多路转接IO
- 就绪事件组
struct epoll_event revs[gnum]
- 针对监听套接字的方法集,在添加连接时可以将方法设置进入
connection
中 - 针对普通套接字的方法集。
回调方法的类型为using handler_t = std::function<void(Connection *conn)>;
#pragma once
#include <string>
#include <iostream>
#include <memory>
#include <unordered_map>
#include "Connection.hpp"
#include "Epoller.hpp"
using namespace log_ns;
class Reactor
{
private:
static const int gnum = 128;
public:
Reactor() : _epoller(std::make_unique<Epoller>()), _isrunning(false)
{
}
void SetOnNormal(handler_t OnRecver, handler_t OnSender, handler_t OnExcepeter)
{
_OnRecver = OnRecver;
_OnSender = OnSender;
_OnExcepeter = OnExcepeter;
}
void SetOnConnect(handler_t OnConnect)
{
_OnConnect = OnConnect;
}
// 加入连接
void AddConnection(int fd, uint32_t events, const InetAddr &addr, int type)
{
}
void Dispatcher()
{
}
~Reactor()
{
}
private:
// fd 映射连接表
std::unordered_map<int, Connection *> _conn;
// 是否启动
bool _isrunning;
std::unique_ptr<Mutliplex> _epoller;
// 事件数组
struct epoll_event revs[gnum];
//_listen新连接到来
handler_t _OnConnect;
// 处理普通fd IO
handler_t _OnRecver;
handler_t _OnSender;
handler_t _OnExcepeter;
};
- Addconnection接口 :首先通过
fd events
与客户端信息和连接类型建立connection
, 进行设置对应的事件集, 然后根据type判断类型,设置connection的上层处理回调方法。注意这里要对conn
与Reactor
进行关联 !后续connection的模块进行讲解,设置addr方便打印日志(可以知道是哪一个客户端);然后通过fd
与events
托管给epoll 进行添加事件 。最后将连接放入哈希表中。 - IsConnExists判断当前连接是否存在
- Dispatch()事件派发接口:进行while循环,获取底层哪些事件就绪 储存在成员变量
struct epoll_event revs[gnum]
,根据返回值 n 对n个事件进行处理!这里只处理 ERR HUP IN OUT 使用if语句ERR HUP直接设置为IN OUT后续统一处理IN事件就绪 事件派发 通过_conn[fd]找到对应连接 执行对应事件的回调函数(注意保证连接存在 且 回调方法存在)。
完整代码如下:
#pragma once
#include <string>
#include <iostream>
#include <memory>
#include <unordered_map>
#include "Connection.hpp"
#include "Epoller.hpp"
using namespace log_ns;
class Reactor
{
private:
static const int gnum = 128;
public:
Reactor() : _epoller(std::make_unique<Epoller>()), _isrunning(false)
{
}
void SetOnNormal(handler_t OnRecver, handler_t OnSender, handler_t OnExcepeter)
{
_OnRecver = OnRecver;
_OnSender = OnSender;
_OnExcepeter = OnExcepeter;
}
void SetOnConnect(handler_t OnConnect)
{
_OnConnect = OnConnect;
}
// 加入连接
void AddConnection(int fd, uint32_t events, const InetAddr &addr, int type)
{
// 1. 通过 fd 构建一个 connection指针 set对应的事件集
Connection *conn = new Connection(fd);
conn->SetReactor(this);
conn->SetEvents(events);
conn->SetConnectionType(type);
conn->SetAddr(addr);
// 2. TODO 设置对connection的上层处理 设置回调方法
if (conn->Type() == ListenConnection)
{
conn->RegisterHandler(_OnConnect, nullptr, nullptr); // 设置方法
}
else
{
conn->RegisterHandler(_OnRecver, _OnSender, _OnExcepeter); // 设置方法
}
// 3. fd 与 events 托管给epoll 添加事件 出错直接 return;
int n = _epoller->AddEvent(fd, events);
// 4. 托管给_connection
_conn.insert(std::make_pair(fd, conn));
// 添加连接成功
}
// 判断连接是否存在
bool IsConnExist(int fd)
{
return _conn.find(fd) != _conn.end();
}
void LoopOnce(int timeout)
{
// 获取底层事件
int n = _epoller->Wait(revs, gnum, -1);
for (int i = 0; i < n; i++)
{
// 文件描述符
int fd = revs[i].data.fd;
// 就绪事件
uint32_t revents = revs[i].events;
// 处理IN OUT ERR HUP
if (revents & EPOLLERR)
revents |= (EPOLLIN | EPOLLOUT);
if (revents & EPOLLHUP)
revents |= (EPOLLIN | EPOLLOUT);
if (revents & EPOLLIN)
{
// 调用回调方法
if (IsConnExist(fd) && _conn[fd]->_handler_recver)
_conn[fd]->_handler_recver(_conn[fd]);
}
if (revents & EPOLLOUT)
{
// 调用回调方法
if (IsConnExist(fd) && _conn[fd]->_handler_sender)
_conn[fd]->_handler_sender(_conn[fd]);
}
}
}
void Dispatcher()
{
_isrunning = true;
int timeout = -1;
while (true)
{
LoopOnce(timeout);
PrintDebug();//打印托管的fd列表
}
_isrunning = false;
}
void PrintDebug()
{
std::string s = "已建立的连接:";
for (auto &conn : _conn)
{
s += std::to_string(conn.first) + ' ';
}
LOG(DEBUG, "epoll 管理的fd列表: %s\n", s.c_str());
}
~Reactor()
{
}
private:
// fd 映射连接表
std::unordered_map<int, Connection *> _conn;
// 是否启动
bool _isrunning;
std::unique_ptr<Mutliplex> _epoller;
// 事件数组
struct epoll_event revs[gnum];
//_listen新连接到来
handler_t _OnConnect;
// 处理普通fd IO
handler_t _OnRecver;
handler_t _OnSender;
handler_t _OnExcepeter;
};
5 Connection连接接口
- 成员变量:
- 文件描述符
fd
- 需要关心的事件集
events
- 输入缓冲区 输出缓冲区
- 三种事件的回调方法
- 设置一个Reactor* _R
- 文件描述符
- SetEvents接口:通过传入events 初始化 events
- Events接口返回事件集
- Sockfd返回对应fd
- RegisterHandler接口快速设置回调方法
- SetReactor(Reactor* R)接口 connection与Reactor进行绑定,执行自己属于的Reactor
对于这个Reactor* _R
指针,是监听套装字获取到连接时发挥作用。当监听套接字的事件就绪,在回调方法中可以通过参数Connection取出内部的_R指针,找到对应的Reactor,进行AddConnection操作。
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include "InetAddr.hpp"
class Connection;
class Reactor;
using handler_t = std::function<void(Connection *conn)>;
#define ListenConnection 0
#define NormalConnection 1
class Connection
{
public:
Connection(int fd) : _sockfd(fd)
{
}
void RegisterHandler(handler_t recver, handler_t sender, handler_t excepter)
{
_handler_recver = recver; // 处理读取
_handler_sender = sender; // 处理写入
_handler_excepter = excepter; // 处理异常
}
void SetEvents(uint32_t events)
{
_events = events;
}
void SetAddr(const InetAddr &addr)
{
_addr = addr;
}
int Sockfd()
{
return _sockfd;
}
uint32_t Events()
{
return _events;
}
int Type()
{
return _type;
}
void SetReactor(Reactor *R)
{
_R = R;
}
void SetConnectionType(int type)
{
_type = type;
}
Reactor *GetReactor()
{
return _R;
}
InetAddr GerInetAddr()
{
return _addr;
}
void AppendInbuffer(const std::string &in)
{
_inbuffer += in;
}
std::string &Inbuffer()
{
return _inbuffer;
}
~Connection()
{
}
private:
int _sockfd; // 套接字fd
uint32_t _events; // 事件集
std::string _inbuffer; // 输入缓冲区
std::string _outbuffer; // 输出缓冲区
Reactor *_R;
int _type;
InetAddr _addr;
public:
handler_t _handler_recver; // 处理读取
handler_t _handler_sender; // 处理写入
handler_t _handler_excepter; // 处理异常
};
6 回调方法
这里需要两种回调方法类,一种针对监听套接字,一种针对普通套接字。
- Listener统一管理Tcp连接模块,管理_listensock
- 成员变量 :
- std::unique_ptr _listensock Tcp套接字对象
- int _port;端口号
- 通过端口号进行构造TcpSocket
ListenSock
接口返回_listensock
的fd。Accepter(conn* , int* code)
方法获取连接并得到文件描述符 (这里采用ET模式)首先将listensockfd
读取设置为非阻塞读取,然后进行while(true)进行非阻塞读取 ,根据Accepter返回的错误码通过code返回 通过错误码进行判断,当读取到一个新的fd
时,通过conn的Reactor指针
调用AddConnection
加入新连接!
#pragma once
#include <memory>
#include <iostream>
#include "Socket.hpp"
#include "Connection.hpp"
using namespace log_ns;
using namespace socket_ns;
// 处理listen套接字的读写
class Listener
{
public:
Listener(uint16_t port) : _port(port), _listensock(std::make_unique<TcpSocket>(port))
{
_listensock->BuildListenSocket(_port);
}
int ListenSockfd()
{
return _listensock->GetSockfd();
}
void Accepter(Connection *conn)
{
LOG(DEBUG, "%d socket ready\n", conn->Sockfd());
// 非阻塞式读取
while (true)
{
errno = 0;
int code = 0;
InetAddr addr;
int sockfd = _listensock->Accepter(&addr, &code);
if (sockfd > 0)
{
LOG(INFO, "成功获取连接, 客户端:%s sockfd:%d\n", addr.AddrStr().c_str(), sockfd);
conn->GetReactor()->AddConnection(sockfd, EPOLLIN | EPOLLET, addr, NormalConnection);
}
else
{
if (code == EWOULDBLOCK)
{
// 读取完毕
LOG(INFO, "底层数据全部读取完毕!\n");
break;
}
// 信号中断
else if (code == EINTR)
{
continue;
}
else
{
LOG(ERROR, "获取连接失败!\n");
break;
}
}
}
}
~Listener()
{
}
private:
uint16_t _port;
std::unique_ptr<Socket> _listensock;
};
- 处理普通连接读写问题,这个的设计就比较简单了,注意其只复杂数据的读取,协议解析需要交给上层进行处理!
- HandlerRecver(conn*):我们先实现读取的逻辑!
- HandlerSender(conn*):后续实现
- HandlerExcepter(conn*):后续实现
#include <sys/types.h>
#include <sys/socket.h>
// 不应该让HandlerConnection处理报文
class HandlerConnection
{
private:
const static int buffersize = 512;
public:
HandlerConnection(handler_t process) : _process(process)
{
}
void Recver(Connection *conn)
{
// LOG(DEBUG , "client发送信息: %d\n" , conn->Sockfd());
// 进行正常读写 --- 非阻塞读取
while (true)
{
char buffer[buffersize];
int n = ::recv(conn->Sockfd(), buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
// buffer是一个数据块 添加到conn的输入缓冲区中
buffer[n] = 0;
conn->AppendInbuffer(buffer);
// 数据交给上层处理
}
else if (n == 0)
{
// 连接断开
LOG(INFO, "客户端[%s]退出, 服务器准备关闭fd: %d\n", conn->GerInetAddr().AddrStr().c_str(), conn->Sockfd());
conn->_handler_excepter(conn); // 统一执行异常处理
}
else
{
// 本轮数据读完了
if (errno == EWOULDBLOCK)
{
// 这是唯一出口
break;
}
// 信号中断
else if (errno == EINTR)
{
continue;
}
// 出现异常
else
{
conn->_handler_excepter(conn);
return;
}
}
}
// 读取完毕,我们应该处理数据了!
// 加入协议
std::cout << "Inbuffer 内容:" << conn->Inbuffer() << std::endl;
_process(conn);
}
void Sender(Connection *conn)
{
}
void Excepter(Connection *conn)
{
}
~HandlerConnection()
{
}
private:
handler_t _process;
};
至此,Reactor反应堆模型的框架已经搭建好了,下一篇文章我们将在这个的基础之上进行协议解析与数据处理!并设计如何将数据发回。这里只是简单的实现读取数据的逻辑!