Linux–多路转接之epoll
Reactor反应堆模式
Reactor反应堆模式是一种事件驱动的设计模式,通常用于处理高并发的I/O操作,尤其是在服务器或网络编程中。
基本概念
Reactor模式又称之为响应器模式,基于事件多路复用机制,使得单个线程能够同时管理大量并发连接,而不需要为每个连接创建一个独立的线程。它通过一个事件分发器(Reactor)来监听和管理不同的I/O事件,当事件发生时,分发器会将该事件分发给对应的事件处理器来处理。
核心组件
- 事件分发器(Reactor):负责监听各种事件源(如socket、文件描述符)并将事件分发给相应的处理器。事件分发器通常使用I/O多路复用机制(如select、poll、epoll)来同时监听多个I/O事件。
- 事件处理器(Event Handler):定义了如何处理特定事件。当事件分发器检测到某个事件时,就会触发相应的事件处理器中的回调函数。
- 同步事件分离器(Demultiplexer):本质上是系统调用,用于监听事件源上的事件,并将事件通知给事件分发器。例如,在Linux中,可以使用select、poll或epoll等系统调用来实现同步事件分离器。
工作流程
- 注册事件:事件分发器注册需要监听的I/O事件(如连接、读写),并关联相应的事件处理器。
- 进入循环:事件分发器进入循环,使用I/O多路复用机制来监听注册的I/O事件。
- 分发事件:一旦某个I/O事件发生,事件分发器会将该事件分发给对应的事件处理器。
- 处理事件:事件处理器执行预定义的操作来处理该事件。处理完成后,可能会重新注册事件或关闭连接。
epoll服务器(ET)
服务器监听一个指定的端口,当有新的连接请求到来时,服务器接受连接并将其注册到Reactor中,以便处理后续的数据读写事件。
Socket.hpp
包含了一个抽象基类 Socket 和一个继承自 Socket 的具体实现类 TcpSocket。提供一个面向对象的网络套接字编程接口,允许用户通过继承和实现基类中的纯虚函数来创建不同类型的套接字(例如 TCP 套接字)。
#include <iostream>
#include <string>
#include <functional>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <pthread.h>
#include <sys/types.h>
#include <memory>
#include "InetAddr.hpp"
#include "Log.hpp"
#include "Comm.hpp"
namespace socket_ns
{
class Socket;
const static int gbacklog=8;//默认最大连接数
using socket_sptr=std::shared_ptr<Socket>;//套接字指针
enum
{
SOCKET_ERROR = 1,
BIND_ERROR,
LISTEN_ERROR,
USAGE_ERROR
};
//在基类创建一系列虚函数,只要派生类能用到就在这里创建
class Socket
{
public:
virtual void CreateSocketOrDie() =0; //创建套接字
virtual void BindSocketOrDie(InetAddr& addr) =0; //绑定套接字
virtual void ListenSocketOrDie()=0; //监听套接字
virtual int Accepter(InetAddr* addr,int* code) =0; //接受客户端
virtual bool Connector(InetAddr &addr) = 0; //连接客户端
virtual int SockFd() = 0; //获取Sockfd
virtual int Recv(std::string *out) = 0; //接收对方信息
virtual int Send(const std::string &in) = 0; //发送给对方信息
virtual void Close()=0; //关闭对应文件
public:
//创建监听套接字,将一系列操作细分化,直接引用对应函数直接创建
void BuildListenSocket(InetAddr& addr)
{
CreateSocketOrDie();
BindSocketOrDie(addr);
ListenSocketOrDie();
}
bool BuildClientSocket(InetAddr &addr)
{
CreateSocketOrDie();
return Connector(addr);
}
};
class TcpSocket : public Socket
{
public:
TcpSocket(int sockfd=-1)
:_sockfd(sockfd)
{}
void CreateSocketOrDie() override //override明确的重写基类函数
{
_sockfd=socket(AF_INET,SOCK_STREAM,0);
if(_sockfd<0)
{
LOG(FATAL, "socket error");
exit(SOCKET_ERROR);
}
SetNonBlock(_sockfd);
LOG(DEBUG, "socket create success, sockfd is : %d\n", _sockfd);
}
void BindSocketOrDie(InetAddr& addr) override
{
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(addr.Port());
local.sin_addr.s_addr = inet_addr(addr.Ip().c_str());
int n=bind(_sockfd,(struct sockaddr*)&local,sizeof(local));
if (n < 0)
{
LOG(FATAL, "bind error");
exit(BIND_ERROR);
}
LOG(DEBUG, "bind success, sockfd is : %d\n", _sockfd);
}
void ListenSocketOrDie() override
{
int n=listen(_sockfd,gbacklog);
if (n < 0)
{
LOG(FATAL, "listen error");
exit(LISTEN_ERROR);
}
LOG(DEBUG, "listen success, sockfd is : %d\n", _sockfd);
}
int Accepter(InetAddr* addr,int* code) override
{
struct sockaddr_in peer;
socklen_t len=sizeof(peer);
int sockfd = accept(_sockfd,(struct sockaddr*)&peer,&len);
*code=errno;
if (sockfd < 0)
{
LOG(WARNING, "accept error\n");
return -1;
}
*addr=peer;
SetNonBlock(sockfd);
//socket_sptr sock=std::make_shared<TcpSocket>(sockfd);
return sockfd;
}
virtual bool Connector(InetAddr& addr)
{
struct sockaddr_in server;
memset(&server,0,sizeof(server));
server.sin_family=AF_INET;
server.sin_addr.s_addr=inet_addr(addr.Ip().c_str());
server.sin_port=htons(addr.Port());
int n=connect(_sockfd,(struct sockaddr*)&server,sizeof(server));
if (n < 0)
{
std::cerr << "connect error" << std::endl;
return false;
}
return true;
}
int Recv(std::string *out) override
{
char inbuffer[1024];
ssize_t n = recv(_sockfd,inbuffer,sizeof(inbuffer)-1,0);
if (n > 0)
{
inbuffer[n] = 0;
*out += inbuffer; // 接收次数可能不只一次,一般是多次的,
}
return n;
}
int Send(const std::string &in) override
{
int n = send(_sockfd,in.c_str(),in.size(),0);
return n;
}
int SockFd() override
{
return _sockfd;
}
void Close() override
{
if (_sockfd > -1)
::close(_sockfd);
}
~TcpSocket()
{}
private:
int _sockfd;
};
}
代码和之前不一样的地方是实现了非阻塞套接字的设置
Calculate.hpp
用于执行基本的算术运算
#pragma once
#include <iostream>
#include "ProToCol.hpp"
using namespace protocol_ns;
class Calculate
{
public:
Calculate()
{
}
//根据输入的请求通过实际计算转换为结果
Response Excute(const Request &req)
{
Response resp(0, 0);
switch (req._oper)
{
case '+':
resp._result = req._x + req._y;
break;
case '-':
resp._result = req._x - req._y;
break;
case '*':
resp._result = req._x * req._y;
break;
case '/':
{
if (req._y == 0)
{
resp._code = 1;
}
else
{
resp._result = req._x / req._y;
}
}
break;
case '%':
{
if (req._y == 0)
{
resp._code = 2;
}
else
{
resp._result = req._x % req._y;
}
}
break;
default:
resp._code = 3;
break;
}
return resp;
}
~Calculate()
{
}
private:
};
protocol.hpp
用于处理网络通信中数据序列化和反序列化、编码和解码以及请求和响应对象生成的类和函数.
#pragma once
#include <iostream>
#include <string>
#include<unistd.h>
#include<memory>
#include<jsoncpp/json/json.h>
namespace protocol_ns
{
// 协议的样子:
// 报文 = 报头+有效载荷
// "有效载荷的长度"\r\n"有效载荷"\r\n
const std::string SEP= "\r\n";
// 解决TCP的粘报问题,TCP 读取不全的问题
std::string Encode(const std::string &json_str)
{
int json_str_len = json_str.size(); //有效载荷的长度
std::string proto_str = std::to_string(json_str_len); //转为string
proto_str += SEP; //+ 分隔符
proto_str += json_str;// + 数据字符串
proto_str += SEP;// + 分隔符
return proto_str; //返回一个报文
}
//将报文分析出数据字符串出来
std::string Decode(std::string &inbuffer)
{
auto pos = inbuffer.find(SEP); //找到分隔符的位置
if (pos == std::string::npos)
return std::string();
std::string len_str = inbuffer.substr(0, pos);//前头的有效数据长度的字符串
if (len_str.empty())
return std::string();
int packlen = std::stoi(len_str);//记录数据字符串的实际长度(传递时的差错主要出在这里)
int total = packlen + len_str.size() + 2 * SEP.size(); //报文总长度
if (inbuffer.size() < total)
return std::string();
std::string package = inbuffer.substr(pos + SEP.size(), packlen); //取出数据字符串
inbuffer.erase(0, total); //删除掉原先的报文
return package;
}
//请求将我们的数据序列化和反序列化(客户端)
class Request
{
public:
Request()
{
}
Request(int x, int y, char oper) : _x(x), _y(y), _oper(oper)
{
}
//序列化:将结构体数据转换为字符串
bool Serialize(std::string* out)
{
Json::Value root; //Json::Value: Json格式的值
root["x"] = _x;
root["y"] = _y;
root["oper"] = _oper;
Json::FastWriter writer;
*out=writer.write(root); //将Json值转换为字符串
return true;
}
//反序列化:将字符串转换为结构体数据
bool DeSerialize(const std::string& in)
{
Json::Value root;
Json::Reader reader;//解析字符串
bool res=reader.parse(in,root);//将字符串转为Json值,存放于root中
if (!res)
return false;
//再将Json值转为结构体数据
_x = root["x"].asInt();
_y = root["y"].asInt();
_oper = root["oper"].asInt();
return true;
}
public:
int _x;
int _y;
char _oper; //操作符 _x 加减乘除 _y
};
//将结果序列化和反序列化(服务端)
class Response
{
public:
Response()
{
}
Response(int result, int code) : _result(result), _code(code)
{
}
bool Serialize(std::string *out)
{
// 转换成为字符串
Json::Value root;
root["result"] = _result;
root["code"] = _code;
Json::FastWriter writer;
// Json::StyledWriter writer;
*out = writer.write(root);
return true;
}
bool Deserialize(const std::string &in)
{
Json::Value root;
Json::Reader reader;
bool res = reader.parse(in, root);
if (!res)
return false;
_result = root["result"].asInt();
_code = root["code"].asInt();
return true;
}
public:
int _result; // 结果
int _code; // 0:success 1: 除0 2: 非法操作 3. 4. 5
};
//创建需求
class Factory
{
public:
Factory()
{
srand(time(nullptr) ^ getpid());
opers = "+-*/%^&|";
}
std::shared_ptr<Request> BuildRequest()
{
int x = rand() % 10 + 1;
usleep(x * 10);
int y = rand() % 5; // [0,1,2,3,4]
usleep(y * x * 5);
char oper = opers[rand() % opers.size()];
std::shared_ptr<Request> req= std::make_shared<Request>(x,y,oper);
return req;
}
std::shared_ptr<Response> BuildResponse()
{
return std::make_shared<Response>();
}
~Factory()
{
}
private:
std::string opers;
};
}
SEP
:定义了报文分隔符为 “\r\n”。Encode
:接受一个 JSON 字符串作为有效载荷,将其长度、分隔符和有效载荷拼接成一个完整的报文字符串返回。Decode
:从输入缓冲区 inbuffer 中解析出一个报文,返回有效载荷字符串,并从 inbuffer 中删除已解析的报文。
Request 类:
- 表示一个计算请求,包含两个整数 _x 和 _y 作为操作数,以及一个字符 _oper 作为运算符。
Serialize
:将 Request 对象序列化为 JSON 格式的字符串。DeSerialize
:将 JSON 格式的字符串反序列化为 Request 对象。
Response 类:
- 表示一个计算响应,包含一个整数 _result 作为运算结果,以及一个整数 _code 作为状态码。
Serialize
:将 Response 对象序列化为 JSON 格式的字符串。Deserialize
:将 JSON 格式的字符串反序列化为 Response 对象。
Factory 类:用于生成 Request 和 Response 对象的工厂类。
在构造函数中初始化了一个包含所有可能运算符的字符串 opers,并使用当前时间和进程 ID 作为随机数种子。
- BuildRequest:生成一个随机的 Request 对象,其中包括随机的操作数和运算符。
- BuildResponse:生成一个默认的 Response 对象。目前,这个实现只是简单地返回了一个新创建的 Response 对象,没有设置任何特定的值。
PackageParse.hpp
负责解析从连接(Connection 对象)中接收到的报文,处理这些报文,并将响应发送回客户端
#pragma once
#include <iostream>
#include "Connection.hpp"
#include "ProToCol.hpp"
#include "CalCulate.hpp"
using namespace protocol_ns;
//对报文进行解析
class PackageParse
{
public:
static void Parse(Connection *conn)
{
// std::cout << "inbuffer: " << conn->Inbuffer() << std::endl;
// 2. 分析数据,确认完整报文
std::string package;
Request req;
Calculate cal;
while (true)
{
// std::cout << conn->Inbuffer() << std::endl;
// conn->AppendOutBuffer(conn->Inbuffer());
// break;
package = Decode(conn->Inbuffer());//取出缓冲区的报文
if (package.empty())
break;
std::cout << "------------------------begin---------------" << std::endl;
std::cout << "resq string:\n"<< package << std::endl;
// 3.反序列化
req.DeSerialize(package);
// 4. 业务处理
Response resp = cal.Excute(req);
// 5. 对应答做序列化
std::string send_str;
resp.Serialize(&send_str);
std::cout << "resp Serialize:" << std::endl;
std::cout << send_str << std::endl;
// 6. 添加长度报头
send_str = Encode(send_str);
std::cout << "resp Encode:" << std::endl;
std::cout << send_str << std::endl;
//将报文放到发送缓冲区中
conn->AppendOutBuffer(send_str);
}
//将缓冲区内容取出,发送到客户端
if(!conn->OutbufferEmpty()&& conn->_sender!=nullptr)
{
conn->_sender(conn);
conn->_R->EnableReadWrite(conn->Sockfd(), true, true);
}
}
};
Comm.hpp
#pragma once
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
//错误原因
enum
{
SOCKET_ERROR = 1,
BIND_ERROR,
LISTEN_ERROR,
USAGE_ERROR,
EPOLL_CREATE_ERROR,
};
//设置为非阻塞的
void SetNonBlock(int fd)
{
int fl = ::fcntl(fd, F_GETFL);//获取之前的信息
if(fl < 0)
{
return;
}
fcntl(fd, F_SETFL, fl | O_NONBLOCK);//转换为非阻塞的
}
Connection.hpp
一个网络连接,用于在客户端和服务器之间传输数据;
Connection 类与 Reactor 类一起工作,实现了事件驱动的网络编程模型。
在这个模型中,Reactor 负责监听和处理各种网络事件(如连接、读取、写入等),而 Connection 对象则作为这些事件的处理者。
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include"Reactor.hpp"
#include "InetAddr.hpp"
#include <unistd.h>
class Connection;
class Reactor;
using func_t = std::function<void(Connection *)>; //定义出有关连接函数的指针
//连接客户端与服务端
//而客户端发出的请求称为事件,服务端称为反应堆,会将事件统收,将处理好的事件派发出去
//连接起到事件与反应堆之间的桥梁
class Connection
{
public:
Connection(int sock) : _sock(sock), _R(nullptr)
{
}
int Sockfd()
{
return _sock;
}
//设置有关事件(初始化)
void SetEvents(int events)
{
_events = events;
}
uint32_t Events()
{
return _events;
}
//初始化对应函数指针,调用时触发函数
void Register(func_t recver, func_t sender, func_t excepter)
{
_recver = recver;
_sender = sender;
_excepter = excepter;
}
//初始化反应堆
void SetSelf(Reactor *R)
{
_R = R;
}
//将数据放入输入缓冲区
void AppendInBuffer(const std::string &buff)
{
_inbuffer += buff;
}
//将数据拿出
std::string& Inbuffer()
{
return _inbuffer;
}
//将数据放到输出缓冲区
void AppendOutBuffer(const std::string &buff)
{
_outbuffer += buff;
}
//将数据拿出
std::string &Outbuffer()
{
return _outbuffer;
}
//判断输出缓冲区是不是为空
bool OutbufferEmpty()
{
return _outbuffer.empty();
}
//将输出缓冲区数据拿出多少
void OutbufferRemove(int n)
{
_outbuffer.erase(0, n);
}
void Close()
{
if(_sock>=0)
::close(_sock);
}
~Connection()
{
}
func_t _recver; //接收者(调用触发接收函数)
func_t _sender; //发送者(调用触发发送函数)
func_t _excepter; //其他,处理错误
Reactor *_R; //反应堆指针(服务器)
private:
int _sock; //sockfd
std::string _inbuffer;//输入缓冲区
std::string _outbuffer;//输出缓冲区
InetAddr _addr;//网络地址
uint32_t _events; // Connection对象中,_sock关心的事件集合
};
- 构造函数:接收一个套接字描述符(sock),并将其存储在私有成员 _sock 中。同时,将 _R(指向 Reactor 的指针)初始化为 nullptr。
Sockfd
方法:返回与这个连接关联的套接字描述符。SetEvents
和Events
方法:允许设置和查询这个连接关心的事件集合(如可读、可写等)。这些事件用于通知 Reactor 何时应该对这个连接进行操作。Register
方法:允许为这个连接注册三个回调函数:_recver(接收数据时调用)、_sender
(发送数据时调用)和_excepter
(处理错误时调用)。这些回调函数是 std::function<void(Connection *)> 类型的,意味着它们可以接受一个指向 Connection 对象的指针作为参数。SetSelf
方法:允许设置这个连接所属的 Reactor 对象(通过 _R 指针)。AppendInBuffer
和Inbuffer
方法:用于管理输入缓冲区。AppendInBuffer 方法将接收到的数据添加到输入缓冲区中,而 Inbuffer 方法则返回输入缓冲区的引用。
HandlerConnection.hpp
对具体处理函数的实现:
#pragma once
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <functional>
#include "Log.hpp"
#include "Connection.hpp"
class HandlerConnection
{
public:
HandlerConnection(func_t func):_func(func)
{}
//处理接收
void HanlderRecv(Connection *conn)
{
LOG(DEBUG, "HanlderRecv fd : %d\n", conn->Sockfd());
while (true)
{
errno = 0; //表示当前没有错误
char buffer[1024];//接收存储区域
ssize_t n = ::recv(conn->Sockfd(), buffer, sizeof(buffer) - 1, 0);//接收函数
if(n > 0)
{
buffer[n] = 0;
conn->AppendInBuffer(buffer);//将数据放入缓冲区中
}
else
{
if(errno == EWOULDBLOCK || errno == EAGAIN)//将报文接收完了(在非阻塞操作中)
{
break;
}
else if(errno == EINTR)//事件操作时被中断了
{
continue;
}
else//出现错误
{
conn->_excepter(conn);//其他处理
return; // 一定要提前返回
}
}
}
_func(conn);//调用函数 ,处理解析
}
//处理发送
void HanlderSend(Connection *conn)
{
errno = 0;
while(true)
{
ssize_t n = ::send(conn->Sockfd(), conn->Outbuffer().c_str(), conn->Outbuffer().size(), 0);//发送到客户端
if(n > 0)
{
// n 实际发送了多少
conn->OutbufferRemove(n);//发完的在缓冲区去掉
if(conn->OutbufferEmpty()) break;
}
else if(n == 0)//没有发送数据了
{
break;
}
else
{
if(errno == EWOULDBLOCK || errno == EAGAIN)//缓冲区读取完毕
{
break;
}
else if(errno == EINTR)//事件中断
{
continue;
}
else//出现错误
{
conn->_excepter(conn);
return;
}
}
}
//发送缓冲区不为空时
if(!conn->OutbufferEmpty())
{
conn->_R->EnableReadWrite(conn->Sockfd(), true, true); //可读可写
}
else//发送缓冲区为空时,不可写出
{
conn->_R->EnableReadWrite(conn->Sockfd(), true, false);
}
}
//处理其他
void HanlderExcpet(Connection *conn)
{
errno = 0;
LOG(DEBUG, "client quit : %d\n",conn->Sockfd());
conn->_R->RemoveConnection(conn->Sockfd());//断开连接
}
private:
func_t _func;//函数指针
};
Epoller.hpp
封装了 Linux 中 epoll 接口的使用,用于高效地管理大量并发网络连接或文件描述符的事件通知。
#pragma once
#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
#include "Log.hpp"
#include "Comm.hpp"
static const int gsize=128;
class Epoller
{
private:
bool EventMethodCore(int fd,u_int32_t events,int type)
{
struct epoll_event ev;
ev.events = events;
ev.data.fd = fd;
int n = ::epoll_ctl(_epfd, type, fd, &ev);
if(n < 0)
{
LOG(ERROR, "epoll_ctl error!\n");
return false;
}
LOG(DEBUG, "epoll_ctl add %d success!\n", fd); // TODO
return true;
}
public:
//初始化,创建epoll
Epoller()
{
_epfd = ::epoll_create(gsize);
if (_epfd < 0)
{
LOG(FATAL, "epoll create error!\n");
exit(EPOLL_CREATE_ERROR);
}
LOG(FATAL, "epoll create success, epfd: %d\n", _epfd);
}
//将事件添加到epoll中
bool AddEvent(int fd, uint32_t events)
{
return EventMethodCore(fd,events,EPOLL_CTL_ADD);
}
//将事件进行修改
bool ModEvent(int fd, uint32_t events)
{
return EventMethodCore(fd, events, EPOLL_CTL_MOD);
}
//对事件进行删除
bool DelEvent(int fd)
{
return ::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
}
//epoll等待事件的发生
int Wait(struct epoll_event revs[], int num, int timeout)
{
int n = ::epoll_wait(_epfd, revs, num, timeout);
return n;
}
~Epoller()
{
if(_epfd >= 0)//析构需要释放掉epoll的fd
::close(_epfd);
}
private:
int _epfd;//epoll的fd
};
EventMethodCore
:这是一个辅助方法,用于向 epoll 实例中添加、修改或删除事件。
Listener.hpp
Listener 的类: 用于在指定端口上监听并接受新的连接请求的
#pragma once
#include <iostream>
#include <memory>
#include "Connection.hpp"
#include "Epoller.hpp"
#include "Socket.hpp"
#include "InetAddr.hpp"
#include "HandlerConnection.hpp"
using namespace socket_ns;
class Listener
{
public:
Listener(int port, HandlerConnection &hc)
: _port(port),
_listensock(std::make_unique<TcpSocket>()),
_hc(hc)
{
InetAddr addr("0", _port);
_listensock->BuildListenSocket(addr);//创建监听fd
}
//接收新的连接
void Accepter(Connection *conn)
{
while (true)
{
InetAddr clientaddr;
int code = 0;
int sockfd = _listensock->Accepter(&clientaddr, &code);//接收新连接
cout<<sockfd<<endl;
if (sockfd >= 0)
{
// TODO
//对反应堆增加新连接
conn->_R->AddConnection(
sockfd,
EPOLLIN | EPOLLET,
std::bind(&HandlerConnection::HanlderRecv, &_hc, std::placeholders::_1),
std::bind(&HandlerConnection::HanlderSend, &_hc, std::placeholders::_1),
std::bind(&HandlerConnection::HanlderExcpet, &_hc, std::placeholders::_1));
}
else//出现错误
{
//cout<<123<<endl;
if (code == EWOULDBLOCK || code == EAGAIN)//表示接收了所有连接
{
LOG(DEBUG, "accepter all link!\n");
break;
}
else if (code == EINTR)//事件中断
{
LOG(DEBUG, "accepter interupt by signal!\n");
continue;
}
else//出现错误
{
LOG(WARNING, "accept error!\n");
break;
}
}
}
}
int Sockfd()
{
return _listensock->SockFd();
}
~Listener()
{
_listensock->Close();
}
private:
uint16_t _port; //端口号
std::unique_ptr<Socket> _listensock;//监听fd
HandlerConnection &_hc; //连接处理事件
};
Accepter:
将这个新的连接添加到 Epoller 中,注册相应的读、写异常事件处理函数。这些处理函数是通过 std::bind 绑定到 HandlerConnection 的成员函数上的。
如果接收过程中出现错误,根据错误码 code 的不同,采取不同的处理方式:
EWOULDBLOCK
或EAGAIN
:表示所有可用的连接都已被接受,此时跳出循环。EINTR
:表示操作被信号中断,继续尝试接受连接。- 其他错误码:记录警告信息,并跳出循环。
Reactor.hpp(重点)
一个使用epoll作为底层事件通知机制的网络服务器框架的核心部分。这个类管理着网络连接,并对这些连接上的事件进行监听和处理。
#pragma once
#include <iostream>
#include <unordered_map>
#include "Epoller.hpp"
#include "Connection.hpp"
//反应堆:本质是服务端,对connetion做了管理工作
class Reactor
{
const static int gnum=64;
public:
Reactor() : _isrunning(false)
{
}
//添加连接到反应堆上
void AddConnection(int fd, uint32_t events, func_t recver, func_t sender, func_t excepter)
{
// 1. 构建Connection
Connection *conn = new Connection(fd);
conn->SetEvents(events);//初始化事件
conn->Register(recver, sender, excepter);//注册方法
conn->SetSelf(this);
// 2. 对epoll添加新事件
_epller.AddEvent(conn->Sockfd(), conn->Events());
// 3. 向_connections添加connection对象
_connections.insert(std::make_pair(conn->Sockfd(), conn));
}
//判断是否有该连接
bool ConnectionIsExists(int sockfd)
{
auto iter = _connections.find(sockfd);
return iter != _connections.end();
}
//读写驱动的更改
void EnableReadWrite(int sockfd, bool readable, bool writeable)
{
uint32_t events = (readable?EPOLLIN:0) | (writeable ? EPOLLOUT : 0) | EPOLLET;
if(ConnectionIsExists(sockfd))
{
// 1. 修改我们写的connection关心的事件
_connections[sockfd]->SetEvents(events);
// 2. 写透到内核中
_epller.ModEvent(sockfd, events);
}
}
//移除连接
void RemoveConnection(int sockfd)
{
if(!ConnectionIsExists(sockfd)) return;
//去掉epoll中对事件的关心
_epller.DelEvent(sockfd);
//服务器关闭sockfd
_connections[sockfd]->Close();
//释放connections
delete _connections[sockfd];
_connections.erase(sockfd);
}
//单次循环处理事务
void LoopOnce(int timeout)
{
int n = _epller.Wait(revs, gnum, timeout);//等待事件的发生
for (int i = 0; i < n; i++)
{ //处理发生的事件
int sockfd = revs[i].data.fd;
uint32_t revents = revs[i].events;
if (revents & EPOLLHUP) //文件符被挂断时
revents |= (EPOLLIN | EPOLLOUT);
if (revents & EPOLLERR) //文件符出现错误时
revents |= (EPOLLIN | EPOLLOUT);
if (revents & EPOLLIN) //可读时
{
//判断连接是否存在并且
if (ConnectionIsExists(sockfd) && (_connections[sockfd]->_recver != nullptr))
{
_connections[sockfd]->_recver(_connections[sockfd]);//调用处理接收函数
}
}
if (revents & EPOLLOUT) //可写时
{
if (ConnectionIsExists(sockfd) && (_connections[sockfd]->_sender != nullptr))
{
_connections[sockfd]->_sender(_connections[sockfd]);
}
}
}
}
// 事件派发核心函数
void Dispatcher()
{
_isrunning = true;
//int timeout = -1;//表示阻塞等待事件的发生
int timeout = 3000;//3s为周期等待事件的发生
while (_isrunning)
{
LoopOnce(timeout);
// 处理其他事情
Debug();
}
_isrunning = false;
}
void Debug()
{
std::cout << "------------------------------------" << std::endl;
for(auto &connection : _connections)
{
std::cout << "fd : " << connection.second->Sockfd() << ", ";
uint32_t events = connection.second->Events();
if((events & EPOLLIN) && (events & EPOLLET))
std::cout << "EPOLLIN | EPOLLET, ";
if((events & EPOLLOUT) && (events & EPOLLET))
std::cout << "EPOLLOUT | EPOLLET";
std::cout << std::endl;
}
std::cout << "------------------------------------" << std::endl;
}
~Reactor() {}
private:
std::unordered_map<int, Connection *> _connections; // int : sockfd
struct epoll_event revs[gnum]; //事件信息的数组
Epoller _epller;//一个epoll
bool _isrunning;//是否运行
};
成员变量
_connections
: 一个unordered_map,用于存储与每个文件描述符(sockfd)相关联的Connection对象。revs
: 一个epoll_event数组,用于从epoll实例中接收事件。_epller
: 一个Epoller对象,负责与epoll接口进行交互。_isrunning
: 一个布尔值,表示Reactor是否正在运行。
成员函数
AddConnection
: 添加一个新的连接到Reactor中,包括构建Connection对象、设置事件、注册处理函数,并将连接添加到_connections映射中。同时,通过_epller对象将连接的文件描述符添加到epoll的监听列表中。ConnectionIsExists
: 检查给定的文件描述符是否存在于_connections映射中。EnableReadWrite
: 修改指定连接关心的事件(读或写),并更新epoll中的事件监听。RemoveConnection
: 从Reactor中移除一个连接,包括从epoll中删除事件监听、关闭文件描述符、删除Connection对象,并从_connections映射中移除。LoopOnce
: 等待并处理一次epoll事件循环中的事件。这包括读取事件、检查文件描述符的状态(如挂断或错误),并调用相应的处理函数(接收或发送)。Dispatcher
: Reactor的主循环函数,不断调用LoopOnce来处理事件,直到_isrunning变为false。Debug
: 打印当前Reactor中所有连接的状态和它们关心的事件。
Main.cc
这是所写头文件的逻辑思路
#include <iostream>
#include <memory>
#include "Reactor.hpp"
#include "Connection.hpp"
#include "Listener.hpp"
#include "PackageParse.hpp"
#include "HandlerConnection.hpp"
#include "Log.hpp"
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cout << "Usage: " << argv[0] << " port" << std::endl;
return 0;
}
uint16_t port = std::stoi(argv[1]);
EnableScreen();
std::unique_ptr<Reactor> react = std::make_unique<Reactor>(); // 主服务
HandlerConnection hc(PackageParse::Parse); //处理连接相关函数的对象
Listener listener(port,hc);//监听注册
//反应堆添加监听连接
react->AddConnection(
listener.Sockfd(),
EPOLLIN| EPOLLET,
std::bind(&Listener::Accepter,&listener,std::placeholders::_1),
nullptr,
nullptr
);
react->Dispatcher();//事件派发
}
- 创建一个Reactor类的智能指针实例,这是主服务组件。
- 创建一个HandlerConnection对象hc,它使用PackageParse::Parse函数来处理数据包的解析。
- 创建一个Listener对象listener,负责监听指定端口上的连接请求,并将新的连接请求通过hc(连接处理器)处理。
- 通过react->AddConnection()方法,将监听套接字(listener.Sockfd())注册到Reactor中,设置监听事件为读事件(EPOLLIN)和边缘触发模式(EPOLLET),并绑定Listener::Accepter方法作为事件处理函数。这里使用了std::bind来绑定Listener对象的Accepter成员函数。
- 调用react->Dispatcher()开始事件分发循环,这是Reactor模式的核心,它不断监听事件并调用相应的事件处理函数。
结果