🌠 作者:@阿亮joy.
🎆专栏:《学会Linux》
🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根
目录
- 👉Reactor 模式👈
- 什么是 Reactor 模式
- Reactor 模式的组件
- Reactor 模式的工作流程
- 👉使用 Reactor 模式设计 TcpServer👈
- socket 的封装
- 日志模块
- Epoll 模型的封装
- TcpServer 的设计
- 协议定制
- 服务端的编写
- 客户端的编写
- 综合演示
- 👉总结👈
👉Reactor 模式👈
什么是 Reactor 模式
Reactor 模式是一种事件驱动的设计模式,是将就绪的事件交给特定的事件处理器,用于实现高效的事件驱动程序。它主要用于网络编程中,用于处理大量的并发连接。
Reactor 模式的组件
Reactor 模式的核心思想是将 I / O 操作(如网络请求、文件操作等)抽象为事件,然后通过一个事件循环(Event Loop)来监视和分发这些事件。它的基本组成部分包括:
-
事件(Event):在网络服务器中,事件通常是指 socket 上的读事件、写事件和异常事件等。事件通常都需要和 socket 与事件处理器绑定在一起。事件可以在 I / O 通道状态发生变化时被触发,然后被事件分发器通知给事件循环。事件循环通过事件的类型和相关信息来确定哪个事件处理器应该处理这个事件。
-
事件处理器(Handler):每一种 I / O 事件都对应一个处理器,负责处理特定类型的事件。例如,网络连接事件、数据读取事件、数据写入事件等都可以有对应的事件处理器。
-
事件循环(Event Loop):是 Reactor 模式的核心部分,它不断地检查是否有新的事件发生,如果有就将事件分发给相应的事件处理器进行处理。事件循环通常以无限循环的形式运行,直到系统关闭。
-
事件分发器(Demultiplexer):用于监视多个 I / O 通道的状态,以确定哪些通道已经就绪(可读或可写)。这个组件可以使用操作系统提供的机制,如 select、poll、epoll 等。高性能的网络服务器使用的都是 epoll 接口。
Reactor 模式的工作流程
Reactor模式的工作流程如下:
-
应用程序初始化:创建事件处理器和事件循环,将事件处理器注册到事件循环中。
-
事件循环开始:事件循环开始无限循环,在每次循环中,它会通过事件分发器检查所有的I/O通道,看是否有事件就绪。
-
事件分发:当某个I/O通道就绪时,事件分发器将通知事件循环,事件循环根据就绪的通道找到对应的事件处理器,并将事件传递给它进行处理。
-
事件处理:事件处理器根据收到的事件类型执行相应的操作,这可能涉及读取数据、写入数据、连接管理等。
-
事件处理完成:事件处理器执行完毕后,将结果返回给事件循环。
通过这种方式,Reactor 模式可以实现高并发的 I / O 操作,因为它使用事件循环在单线程中处理多个 I / O 事件,避免了创建多个线程或进程,从而减少了资源开销和上下文切换的成本。
但是 Reactor 模式适用于 I / O 密集型的应用,但不适用于 CPU 密集型的场景,因为在事件处理过程中如果发生阻塞操作,会影响其他事件的处理。为了解决这个问题,可以将阻塞的操作委托给线程池等机制来处理。
👉使用 Reactor 模式设计 TcpServer👈
socket 的封装
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ctype.h>
#include <fcntl.h>
class Sock
{
private:
// listen的第二个参数是底层全链接队列的长度,其数值为listen的第二个参数+1
const static int gbackLog = 10;
public:
Sock()
{}
// 常见套接字
static int Socket()
{
int sock = socket(AF_INET, SOCK_STREAM, 0);
if(listen < 0) exit(2);
int opt = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
return sock;
}
// 绑定 IP 地址和端口号
static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0")
{
struct sockaddr_in local;
memset(&local, 0, sizeof local);
local.sin_family = AF_INET;
local.sin_port = htons(port);
inet_pton(AF_INET, ip.c_str(), &local.sin_addr);
if(bind(sock, (struct sockaddr*)&local, sizeof(local)) < 0) exit(3);
}
static void Listen(int sock)
{
if(listen(sock, gbackLog) < 0)
exit(4);
}
// 一般经验
// const std::string &: 输入型参数
// std::string *: 输出型参数
// std::string &: 输入输出型参数
// 获取新连接
static int Accept(int listenSock, std::string* ip, uint16_t* port, int* acceptErrno)
{
struct sockaddr_in src;
socklen_t len = sizeof(src);
*acceptErrno = 0;
int serviceSock = accept(listenSock, (struct sockaddr*)&src, &len);
if(serviceSock < 0)
{
*acceptErrno = errno;
return -1;
}
if(port) *port = ntohs(src.sin_port);
if(ip) *ip = inet_ntoa(src.sin_addr);
return serviceSock;
}
// 连接服务器
static bool Connect(int sock, const std::string& serverIp, const uint16_t& serverPort)
{
struct sockaddr_in server;
memset(&server, 0, sizeof server);
server.sin_family = AF_INET;
server.sin_port = htons(serverPort);
server.sin_addr.s_addr = inet_addr(serverIp.c_str());
if(connect(sock, (struct sockaddr*)&server, sizeof server) == 0) return true;
else return false;
}
// 将 sock 设置为非阻塞,与 ET 模式配合使用
static bool SetNonBlock(int sock)
{
int fl = fcntl(sock, F_GETFL);
if(fl < 0) return false;
fcntl(sock, F_SETFL, fl | O_NONBLOCK);
return true;
}
~Sock()
{}
};
Sock 类主要是封装了 socket 的相关接口,主要是创建套接字、绑定 IP 地址和端口号、将套接字设置为监听套接字、连接服务器、服务器获取新连接以及将套接字设置为非阻塞等等。
日志模块
#pragma once
#include <cstdio>
#include <cstdarg>
#include <string>
#include <iostream>
#include <poll.h>
#include <ctime>
// 日志等级
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
#define LOGFILE "./ThreadPool.log"
const char* levelMap[] =
{
"DEBUG",
"NORMAL",
"WARNING",
"ERROR",
"FATAL"
};
void logMessage(int level, const char* format, ...)
{
char stdBuffer[1024]; // 标准部分
time_t timestamp = time(nullptr);
// struct tm *localtime = localtime(×tamp);
snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", levelMap[level], timestamp);
char logBuffer[1024]; // 自定义部分
va_list args; // va_list就是char*的别名
va_start(args, format); // va_start是宏函数,让args指向参数列表的第一个位置
// vprintf(format, args); // 以format形式向显示器上打印参数列表
vsnprintf(logBuffer, sizeof logBuffer, format, args);
va_end(args); // va_end将args弄成nullptr
// FILE* fp = fopen(LOGFILE, "a");
printf("%s%s\n", stdBuffer, logBuffer);
// fprintf(fp, "%s%s\n", stdBuffer, logBuffer); // 向文件中写入日志信息
// fclose(fp);
}
日志模块在之前的博客中已经多次提及到,就不再赘述了。
Epoll 模型的封装
#pragma once
#include <iostream>
#include <sys/epoll.h>
class Epoll
{
private:
const static int defaultSize = 128;
const static int defaultTimeOut = 3000;
public:
Epoll(int timeout = defaultTimeOut)
: _timeout(timeout)
{}
~Epoll()
{
if(_epfd >= 0) close(_epfd);
}
// 创建 epoll 模型
void CreateEpoll()
{
_epfd = epoll_create(defaultSize);
if(_epfd < 0) exit(5);
}
// 将 socket 添加到 epoll 模型中
bool AddSockToEpoll(int sock, uint32_t events)
{
events |= EPOLLET; // ET 模式
struct epoll_event ev;
ev.data.fd = sock;
ev.events = events;
int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev);
return n == 0;
}
bool DelSockFromEpoll(int sock)
{
int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
return n == 0;
}
int WaitEpoll(struct epoll_event* readyEvents, int maxEvents)
{
return epoll_wait(_epfd, readyEvents, maxEvents, _timeout);
}
bool CtlEpoll(int sock, uint32_t events)
{
events |= EPOLLET;
struct epoll_event ev;
ev.data.fd = sock;
ev.events = events;
int n = epoll_ctl(_epfd, EPOLL_CTL_MOD, sock, &ev);
return n == 0;
}
private:
int _epfd;
int _timeout;
};
Epoll 类主要是封装了 epoll 的相关接口,方便 TcpServer 服务器进行调用。
- CreateEpoll:TcpServer 调用该接口,即可创建一个 epoll 模型。
- AddSockToEpoll:将 TcpServer 需要关心该 sock 的 events 事件添加到 epoll 模型中,让 epoll 来关心这些事件,事件发生后通知 TcpServer 进行处理。
- DelSockFromEpoll:让 epoll 模型不再关心 sock 上发生的事件。
- WaitEpoll:让 epoll 模型等待事件的发生,事件发生后通知 TcpServer 进行处理。
- CtlEpoll:修改 epoll 模型对 sock 上发生的事件的关心。比如,之前只让 epoll 模型关心 sock 的读事件,现在让 epoll 模型也关心 sock 的写事件。
TcpServer 的设计
TcpServer 需要处理的事件
- 读事件:如果监听套接字的读事件就绪,则调用封装好 Accepter 函数获取底层建立好的连接,并进行相应的处理。而如果是普通套接字的读事件就绪,则调用封装好的 Recver 函数读取客户单发来的数据,并进行业务处理。
- 写事件:当写事件就绪时,就调用封装好的 Sender 函数写入到底层 TCP 的发送缓冲区中。
- 异常事件:当某个文件描述符的异常事件发生了,我们直接将该文件描述符关闭掉,不进行过多的处理。
Connection 类的封装
#pragma once
#include <iostream>
#include <functional>
#include <vector>
#include <string>
#include <unordered_map>
#include <cassert>
#include <cerrno>
#include "Log.hpp"
#include "Sock.hpp"
#include "Epoll.hpp"
#include "Protocol.hpp"
// 前置声明
class TcpServer;
class Connection;
using func_t = std::function<void(Connection*)>;
using callback_t = std::function<void(Connection*, std::string&)>;
// 使用 Connection 来表示一个连接
// 该连接有自己的接收缓冲区和发送缓冲区
class Connection
{
public:
Connection(int sock = -1)
: _sock(sock)
, _ptr(nullptr)
{}
// 设置该连接的回调函数
void SetCallBack(func_t readCallBack, func_t writeCallBack, func_t exceptCallBack)
{
_readCallBack = readCallBack;
_writeCallBack = writeCallBack;
_exceptCallBack = exceptCallBack;
}
~Connection()
{
if(_sock >= 0) close(_sock);
}
public:
// 负责进行 IO 的文件描述符
int _sock;
// 三个回调方法,表示对 _sock 的读事件、写事件和异常事件的处理方法
func_t _readCallBack;
func_t _writeCallBack;
func_t _exceptCallBack;
// 每个连接独占的接收缓冲区和发送缓冲区
std::string _inBuffer; // 暂时没有办法处理二进制流,文本数据是可以处理的
std::string _outBuffer;
// TcpServer 的回指指针,用于业务处理
TcpServer* _ptr;
};
Connection 类中除了包含文件描述符和其对应的读时间回调、写事件回调和异常事件回调之外,还包含一个输入缓冲区 _inBuffer、一个输出缓冲区 _outBuffer 以及一个回指指针 _ptr。
- 当某个文件文件描述符的读事件就绪时,我们会调用 recv 函数读取客户端发过来的数据,但是这并不能保证我们读取到了一个完整的报文,因此我们需要将读取到的数据暂时存放在该文件描述符对应的接收缓冲区 _inBuffer 中。当 _inBuffer 中的数据能够分离出一个完整的报文,我们再进行业务处理,所以 _inBuffer 的本质就是用来解决粘包问题的。
- 当处理完一个网络请求后,需要将响应的数据发送给客户端,但是我们并不能保证底层 TCP 的发送缓冲区中有足够的空间供我们写入,因此需要将要发送的响应数据暂时保存在该文件描述符对应的发送缓冲区 _outBuffer 中。等待底层 TCP 发送缓冲区中有空间时,再将 _outBuffer 中的数据发送出去。
- Connection 类中还包含了指向 TcpServer 的回指指针,便于我们能够快速找到 TcpServer,这种操作在开源项目中非常常见,有利于我们进行软件分层。业务处理完,需要将响应发给客户端,此时我们就可以开启对文件描述符写事件的关心,然后就能够触发一次写事件就绪将响应发送出去。
TcpServer 的构造和析构
class TcpServer
{
private:
const static int defaultPort = 8080;
const static int defaultMaxEvents = 128;
public:
TcpServer(int port = defaultPort, int maxEvents = defaultMaxEvents)
: _port(port), _maxEvents(maxEvents)
{
// 1. 创建监听套接字
_listenSock = Sock::Socket();
Sock::Bind(_listenSock, _port);
Sock::Listen(_listenSock);
// 2. 创建 epoll 模型
_epoll.CreateEpoll();
// 3. 将 _listenSock 封装成 Connection 添加到 TcpServer 中
// _listenSock 只需要关心读事件即可
AddConnection(_listenSock, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
// 4. 申请保存就绪事件的数组
_readyEvents = new struct epoll_event[_maxEvents];
logMessage(NORMAL, "Init TcpServer Success");
}
~TcpServer()
{
if (_listenSock >= 0)
close(_listenSock);
if (_readyEvents)
delete[] _readyEvents;
}
private:
int _listenSock; // 监听套接字
int _port; // 端口号
Epoll _epoll;
std::unordered_map<int, Connection *> _connections; // 用来管理连接的哈希表
struct epoll_event *_readyEvents; // 保存就绪事件的数组,可以用 vector 代替
int _maxEvents; // _readyEvents 的最大容量
// 业务处理的回调函数
callback_t _callBack;
};
在 TcpServer 类中,除了必要的服务器的端口号、监听套接字之外,还有封装好的 _epoll、用来管理连接的哈希表以及业务处理的回调函数等等。
- 在构造函数中,主要做的事件就是创建监听套接字、创建 epoll 模型、调用 AddConnetion 函数设置监听套接字的读事件的回调函数并将监听套接字封装成 Connection 保存到哈希表 _connections 中进行管理、申请空间用来保存就绪的事件。
- 在析构函数中,需要调用 close 函数来关闭监听套接字,还需要释放用来保存就绪事件的空间。
- 设置业务处理的回调函数的目的是让网络层的代码和业务层的代码解耦,避免强耦合。这样做的好处就是修改业务层的代码,并不会影响到网络层的代码。
AddConnetion 函数的介绍
class TcpServer
{
// ...
private:
// 将连接添加到 TcpServer 中
void AddConnection(int sock, func_t readCallBack, func_t writeCallBack, func_t exceptCallBack)
{
/* ************************************************************
* TcpServer 上将会存在大量的连接,也就是会存在大量的 Connection
* 那么 TcpServer 就需要采用先描述再组织的方式来将大量的 Connection
* 管理起来,在这里我是要哈希表的方式来管理
* ************************************************************
*/
// 1. 将文件描述符设置为非阻塞(ET模式)
Sock::SetNonBlock(sock);
// 2. 构建 Connection 对象
Connection *conn = new Connection(sock);
conn->SetCallBack(readCallBack, writeCallBack, exceptCallBack);
conn->_ptr = this;
// 3. 将 sock 添加到 epoll 中
// 任何的多路转接服务器一般默认只会打开对
// 读事件的关心,对写事件的关心按需打开
_epoll.AddSockToEpoll(sock, EPOLLIN | EPOLLET);
// 4. 保存文件描述符与 Connection 的映射关系
_connections[sock] = conn;
}
// ...
};
- 需要给 AddConnetion 函数传入四个参数,分别是套接字 sock,读事件回调 readCallBack、写事件回调 writeCallBack 以及异常事件回调 exceptCallBack。
- 由于 TcpServer 采用的是 ET 模式的 epoll 模型,所以需要将套接字 sock 设置为非阻塞。
- 申请 Connection 对象保存套接字 sock,设置 Connection 的相关回调,然后设置指向 TcpServer 的回指指针。
- 将 sock 添加到 epoll 模型中,让 epoll 来帮我们关心 sock 上发送的事件。需要注意的是,我们只需要让 epoll 关心 sock 的读事件即可,写事件按照需要来进行开启。
- 最后,需要将 sock 和 Connection 的映射关系保存到哈希表中。
事件分发 Dispather
class TcpServer
{
// ...
public:
// 根据就绪的事件进行特定事件的派发
void Dispacther(callback_t callBack)
{
_callBack = callBack;
while (true)
{
LoopOnce();
}
}
private:
void LoopOnce()
{
int n = _epoll.WaitEpoll(_readyEvents, _maxEvents);
for (int i = 0; i < n; ++i)
{
int sock = _readyEvents[i].data.fd; // 就绪的文件描述符
uint32_t revents = _readyEvents[i].events; // 就绪的事件
// 将所有的异常都交给Recver或Sender来处理
if (revents & EPOLLERR) // 文件描述符发生了错误
revents |= (EPOLLIN | EPOLLOUT);
if (revents & EPOLLHUP) // 对端关闭了连接
revents |= (EPOLLIN | EPOLLOUT);
// 如果连接存在且设置了相应的回调方法,才可以调用回调方法
if (revents & EPOLLIN)
{
if (ConnectionIsExists(sock) && _connections[sock]->_readCallBack != nullptr)
_connections[sock]->_readCallBack(_connections[sock]);
}
if (revents & EPOLLOUT)
{
if (ConnectionIsExists(sock) && _connections[sock]->_writeCallBack != nullptr)
_connections[sock]->_writeCallBack(_connections[sock]);
}
}
}
// 检查连接是否存在
bool ConnectionIsExists(int sock)
{
return _connections.count(sock) == 1;
}
// ...
};
- Dispacther 函数需要调用者传入一个回调函数,这个回调函数是用于处理相关业务的,只要传入的回调函数不同,就能够处理不同的业务了。保存好用于处理业务的回调函数后,就需要开启事件循环了。事件循环通常以死循环的形式运行,也就是一直调用 LoopOnce 函数。
- 在 LoopOnce 函数中,主要是调用 WaitEpoll 函数等待事件的发生。当有事件发生后,如果 sock 在哈希表中并设置了相应的回调函数,则调用相应的回调函数,也就是事件处理。
Accepter 函数的介绍
class TcpServer
{
// ...
private:
// _listenSock 用来接收新连接的回调方法
void Accepter(Connection *conn)
{
// 来到这里说明有新的连接到来,因为不知道底层有多少个连
// 接,所以需要通过while循环一直获取连接。如果不是这样
// 的话,可能会导致有些客户端长时间连接不上服务器的问题
while (true)
{
std::string clientIp;
uint16_t clientPort;
int acceptErrno = 0;
int sock = Sock::Accept(_listenSock, &clientIp, &clientPort, &acceptErrno);
if (sock < 0)
{
if (acceptErrno == EAGAIN || acceptErrno == EWOULDBLOCK)
break; // 底层的连接已经全部获取完了,可以退出循环了
else if (acceptErrno == EINTR)
continue; // 被信号中断,可以继续获取连接
else
{
// accept失败
logMessage(WARNING, "Accepter Error:%d %s", acceptErrno, strerror(acceptErrno));
break;
}
}
// 将 sock 添加到 TcpServer 中
if (sock >= 0)
{
AddConnection(sock, std::bind(&TcpServer::Recver, this, std::placeholders::_1),
std::bind(&TcpServer::Sender, this, std::placeholders::_1),
std::bind(&TcpServer::Excepter, this, std::placeholders::_1));
logMessage(DEBUG, "Accept Client[%s:%d:%d] Success", clientIp.c_str(), clientPort, sock);
}
}
}
// ...
};
- 在 Accepter 函数中,主要就是将底层建立好的连接全部获取上来,然后调用 AddConnection 设置 sock 的事件处理器并将连接保存在哈希表中管理起来。
- 当 Accept 函数的返回值为 -1 时,需要检查错误码。当错误码为 EAGAIN 或 EWOULDBLOCK 时,则表示底层的连接全部获取完了,可以退出循环了。当错误码为 EINTR 时,则表明获取连接时收到了信号,继续循环即可。当错误码为其他时,则表明真的遇到了错误,那么就打印错误信息并退出循环。
Recver 函数的介绍
class TcpServer
{
// ...
private:
// 普通套接字读事件就绪的回调方法
void Recver(Connection *conn)
{
const int num = 1024;
bool err = false; // 表示对端是否关闭连接
while (true)
{
char buffer[num];
ssize_t n = recv(conn->_sock, buffer, sizeof(buffer) - 1, 0);
if (n < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
else if (errno == EINTR)
continue;
else
{
logMessage(ERROR, "Recver Error:%d %s", errno, strerror(errno));
conn->_exceptCallBack(conn);
err = true;
break;
}
}
else if (n == 0)
{
logMessage(DEBUG, "Client[%d] Quit, Me Too", conn->_sock);
conn->_exceptCallBack(conn);
err = true;
break;
}
else
{
// 读取底层数据成功
buffer[n] = 0;
conn->_inBuffer += buffer; // 将读取到的事件添加到接受缓存中
}
}
if (!err)
logMessage(DEBUG, "Client[%d] Send Message:%s", conn->_sock, conn->_inBuffer.c_str());
if (!err)
{
std::vector<std::string> messages;
// 对 _inBuffer 进行切分,并将切分结果放入到 messages 中
SpliteMessage(conn->_inBuffer, messages);
// 将分割出来的独立报文交给具体业务进行处理
// 在这里可以将message封装成为task,然后push
// 到任务队列,任务处理交给后端线程池
for (auto &msg : messages)
_callBack(conn, msg); // msg 就是一个完整的报文
}
}
// ...
};
- Recver 函数是普通套接字读事件就绪的回调方法。
- 在 Recver 函数中,主要是调用 recv 函数将底层收到的数据获取上来,然后根据 recv 函数的返回值 n 来进行具体的操作。
- 当 n 大于 0 时,则说明获取底层的数据成功。但这并不意味着底层的数据全部获取完了,所以还需要进行读取,不能退出循环。当 n 等于 0 时,则说明对端关闭连接,需要调用异常事件处理函数 Excepter 进行相应的异常处理,然后直接 return 返回。当 n 小于 0 时,则需要关心错误码 errno 了。当 errno 为 EAGAIN 或 EWOULDBLOCK 时,则表示底层的数据全部获取完了,可以退出循环了。当 errno 为 EINTR 时,则表明获取数据时收到了信号,继续循环即可。当 errno 为其他时,则表明真的遇到了错误,那么就打印错误信息,调用 Excepter 函数,然后直接 return 返回。
- 当退出 while 循环时,则需要对获取到的数据进行解析,然后再进行相关的业务处理。
-注:对端关闭连接,也会让文件描述符的读事件就绪。
Sender 函数的介绍
class TcpServer
{
// ...
public:
void EnableReadWrite(Connection *conn, bool readAble, bool writeAble)
{
uint32_t events = ((readAble ? EPOLLIN : 0) | (writeAble ? EPOLLOUT : 0));
bool ret = _epoll.CtlEpoll(conn->_sock, events);
assert(ret);
}
private:
void Sender(Connection *conn)
{
while (true)
{
ssize_t n = send(conn->_sock, conn->_outBuffer.c_str(), conn->_outBuffer.size(), 0);
if (n > 0)
{
conn->_outBuffer.erase(0, n);
if (conn->_outBuffer.empty())
break;
}
else
{
if (errno == EAGAIN || errno == EWOULDBLOCK) // 底层TCP的发送缓冲区没有空间了
break;
else if (errno == EINTR)
continue;
else
{
logMessage(ERROR, "Sender Error:%d %s", errno, strerror(errno));
conn->_exceptCallBack(conn);
return;
}
}
}
// 如果数据发送完了,则需要取消对conn->_sock写事件的关心
// 如果数据没有发送完,则不需要取消对conn->_sock写事件的关心
if (conn->_outBuffer.empty())
EnableReadWrite(conn, true, false);
else
EnableReadWrite(conn, true, true);
}
// ...
};
- Sender 函数是将响应发回给客户端,但是发送可能会出现各种情况,因此我们要对 send 的返回值 n 进行判断。
- 当 n 大于 0 时,则说明数据已经成功拷贝到底层 TCP 的发送缓冲区中了。当 n 小于等于 0 时,则需要判断错误码。当错误码为 EAGAIN 或 EWOULDBLOCK 时,则说明底层 TCP 的发送缓冲区已经满了,退出循环,下次再调用 send。当错误码为 EINTR 时,则说明收到了信号,继续循环调用 send。当错误等于其他时,则说明遇到了错误,调用 Excepter 函数,然后直接 return 返回。
- 退出 while 循环后,如果 Connection 发送缓冲区中的数据都拷贝到底层 TCP 的发送缓冲区了,则关闭对该文件描述符写事件的关心。否则,保持对该文件描述符写事件的关心。
Excepter 函数的介绍
class TcpServer
{
// ...
private:
void Excepter(Connection *conn)
{
if (!ConnectionIsExists(conn->_sock))
return;
// 1. 从epoll中移除conn->_sock
int n = _epoll.DelSockFromEpoll(conn->_sock);
assert(n);
(void)n;
// 2. 从_connections中移除conn->_sock
_connections.erase(conn->_sock);
// 3. 释放conn所占用的资源(Connect的析构函数会关闭文件描述符)
delete conn;
}
// ...
};
- Excepter 函数的主要工作就是移除 epoll 模型对文件描述符的关心、从哈希表中删除文件描述符以及释放 Connection。
协议定制
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <cstring>
// 使用特殊字符作为分隔符来解决粘包问题,处理独立报文
#define SEP "X"
#define SEP_LEN strlen(SEP)
// 用于序列化和反序列化
#define SPACE " "
#define SPACE_LEN strlen(SPACE)
// buffer: 输入输出型参数
// out: 输出型参数
// buffer中可能会存在多个报文,没有报文是以SEP作为分隔符的
// 我们想要将不同的报文切分好,放入到out中
void SpliteMessage(std::string& buffer, std::vector<std::string>& out)
{
while(true)
{
auto pos = buffer.find(SEP);
if(pos == std::string::npos) break;
std::string message = buffer.substr(0, pos);
buffer.erase(0, pos + SEP_LEN);
out.emplace_back(message);
}
}
// 给报文加上分隔符
std::string Encode(std::string& s)
{
return s + SEP;
}
// 去除报文中的分割符
std::string Decode(std::string& s)
{
auto pos = s.find(SEP);
// s中没有一个完整的报文时,返回空串
// s中有一个完整的报文时,返回该报文
if(pos == std::string::npos) return "";
std::string ret = s.substr(0, pos);
s.erase(0, pos + SEP_LEN);
return ret;
}
// 网络版计算器的 Request 和 Response
class Request
{
public:
// 序列化
std::string Serialize()
{
std::string str;
str = std::to_string(_x);
str += SPACE;
str += _op;
str += SPACE;
str += std::to_string(_y);
return str;
}
// 反序列化
bool Deserialized(const std::string &str)
{
std::size_t left = str.find(SPACE);
if (left == std::string::npos)
return false;
std::size_t right = str.rfind(SPACE);
if (right == std::string::npos)
return false;
_x = atoi(str.substr(0, left).c_str());
_y = atoi(str.substr(right + SPACE_LEN).c_str());
if (left + SPACE_LEN > str.size())
return false;
else
_op = str[left + SPACE_LEN];
return true;
}
public:
Request()
{}
Request(int x, int y, char op)
: _x(x)
, _y(y)
, _op(op)
{}
~Request() {}
public:
int _x;
int _y;
char _op;
};
class Response
{
public:
// 序列化
std::string Serialize()
{
std::string s;
s = std::to_string(_code);
s += SPACE;
s += std::to_string(_ret);
return s;
}
// 反序列化
bool Deserialized(const std::string &s)
{
std::size_t pos = s.find(SPACE);
if (pos == std::string::npos)
return false;
_code = atoi(s.substr(0, pos).c_str());
_ret = atoi(s.substr(pos + SPACE_LEN).c_str());
return true;
}
public:
Response()
{}
Response(int ret, int code)
: _ret(ret)
, _code(code)
{}
~Response() {}
public:
int _ret; // 计算结果
int _code; // 计算结果的状态码
};
我们的业务还是一个网络版的计算器,我们需要定制协议来解决粘包问题。
- SpliteMessage 函数的作用就是以 SEP 为分割符,将 Connection 的接受缓冲区中的数据分割成一个个的报文,然后交给业务层进行处理。
- Encode 函数的作用是给报文加上分割符,以明确报文和报文之间的边界,以解决粘包问题。Decode 函数的作用是去除报文中的分割符并返回一个完整的报文,如果没有一个完整的报文,则返回空串。
服务端的编写
#include "TcpServer.hpp"
#include <memory>
#include <unordered_map>
#include <fstream>
static Response Calculator(const Request &req)
{
Response resp(0, 0);
switch (req._op)
{
case '+':
resp._ret = req._x + req._y;
break;
case '-':
resp._ret = req._x - req._y;
break;
case '*':
resp._ret = req._x * req._y;
break;
case '/':
if (req._y == 0)
resp._code = 1; // 除零错误
else
resp._ret = req._x / req._y;
break;
case '%':
if (req._y == 0)
resp._code = 2; // 模零错误
else
resp._ret = req._x % req._y;
break;
default:
resp._code = 3;
break;
}
return resp;
}
void NetCal(Connection* conn, std::string& request)
{
logMessage(DEBUG, "NetCal Been Called, Get Request: %s", request.c_str());
// 1. 反序列化
Request req;
if(!req.Deserialized(request))
{
// 按道理来说,来到这里收到的一定是一个完整的报文
// 但现在却反序列化失败,输出一下错误日志
logMessage(ERROR, "Request Deserialized Error");
return;
}
// 2. 业务处理
Response resp = Calculator(req);
// 3. 序列化,构建应答
std::string sendStr = resp.Serialize();
sendStr = Encode(sendStr);
// 4. 业务层不需要关心数据如何发送给客户端,只需要将序
// 列化后的应答交给TcpServer,让它将应答发送给客户端
conn->_outBuffer += sendStr;
// 一旦开启对EPOLLOUT的关心,epoll会立即触发一次发送事件就绪
// 如果后续保持对EPOLL的关心,TcpServer会一直进行发送
conn->_ptr->EnableReadWrite(conn, true, true);
}
int main()
{
std::unique_ptr<TcpServer> svr(new TcpServer());
svr->Dispather(NetCal);
return 0;
}
创建一个 TcpServer,然后调用它的 Dispatcher 函数。调用 Dispatcher 函数时,需要传入业务处理的回调函数。传入的回调函数不同,就可以处理不同的业务了。
客户端的编写
#include <iostream>
#include <string>
#include <ctime>
#include "Sock.hpp"
#include "Protocol.hpp"
// 客户端调用
void Send(int sock, const std::string& sendStr)
{
int n = send(sock, sendStr.c_str(), sendStr.size(), 0);
if(n != sendStr.size())
std::cerr << "Send Error" << std::endl;
}
// 客户端调用
bool Recv(int sock, std::string& out)
{
char buffer[1024];
while(true)
{
ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0);
if(n > 0)
{
buffer[n] = 0;
out += buffer;
}
else if(n == 0)
{
return false;
}
else
{
if(errno == EAGAIN || errno == EWOULDBLOCK)
{
if(out.empty()) continue; // 如果没有收到响应,则继续对应 recv 函数
return true;
}
else if(errno == EINTR)
{
continue;
}
else
{
std::cerr << "Recv Error" << std::endl;
return false;
}
}
}
}
static void Usage(char* proc)
{
std::cout << "\nUsage: " << proc << "serverIp serverPort" << std::endl;
}
// 网络版计算器客户端
int main(int argc, char* argv[])
{
srand((unsigned)time(nullptr));
if(argc != 3)
{
Usage(argv[0]);
exit(1);
}
int sock = Sock::Socket();
if(!Sock::Connect(sock, std::string(argv[1]), atoi(argv[2])))
{
std::cerr << "Connect Error: " << errno << " " << strerror(errno) << std::endl;
}
const char* op = "+-*/%#";
Sock::SetNonBlock(sock);
std::string buffer;
while(true)
{
Request req;
// req._x = rand() % 100;
// req._y = rand() % 100;
// req._op = op[rand() % 6];
std::cout << "Please Enter x: ";
std::cin >> req._x;
std::cout << "Please Enter op: ";
std::cin >> req._op;
std::cout << "Please Enter y: ";
std::cin >> req._y;
// 序列化
std::string sendStr = req.Serialize();
sendStr = Encode(sendStr);
// 发送请求
Send(sock, sendStr);
// 非阻塞读取全部数据
bool ret = Recv(sock, buffer);
if(!ret) break; // 服务器关闭连接或者Recv出错了
std::string package = Decode(buffer);
if(package.empty()) continue;
// 接收到了一个完整的报文
Response resp;
resp.Deserialized(package);
std::string err;
switch(resp._code)
{
case 1:
err = "Division By Zero Error";
break;
case 2:
err = "Modular Division By Zero Error";
break;
case 3:
err = "Operation Error";
break;
default:
std::cout << "Calculation Result: " << resp._ret << std::endl;
break;
}
if(!err.empty()) std::cout << err << std::endl;
}
close(sock);
return 0;
}
综合演示
将业务替换成在线词典
服务端
#define FILENAME "OnlineDict.txt"
// 加载单词
void LoadWord(std::unordered_map<std::string, std::string>& m)
{
std::ifstream ifs(FILENAME);
std::string key, value;
while(ifs >> key >> value)
{
m[key] = value;
}
}
void OnlineDict(Connection* conn, std::string& request)
{
static std::unordered_map<std::string, std::string> m;
// 加载单词且只加载一次
if(m.empty()) LoadWord(m);
// 观察单词是否加载成功
// for(auto& kv : m)
// {
// std::cout << kv.first << ":" << kv.second << std::endl;
// }
logMessage(DEBUG, "NetCal Been Called, Get Request: %s", request.c_str());
// request就是要查找的单词
auto it = m.find(request);
std::string sendStr;
if(it == m.end())
sendStr = request + " Is Not Found";
else
sendStr = request + " Means " + m[request];
sendStr = Encode(sendStr); // 添加分隔符
conn->_outBuffer += sendStr;
// 一旦开启对EPOLLOUT的关心,epoll会立即触发一次发送事件就绪
// 如果后续保持堆EPOLL的关心,TcpServer会一直进行发送
conn->_ptr->EnableReadWrite(conn, true, true);
}
int main()
{
std::unique_ptr<TcpServer> svr(new TcpServer());
svr->Dispacther(OnlineDict);
return 0;
}
客户端
int main(int argc, char* argv[])
{
srand((unsigned)time(nullptr));
if(argc != 3)
{
Usage(argv[0]);
exit(1);
}
int sock = Sock::Socket();
if(!Sock::Connect(sock, std::string(argv[1]), atoi(argv[2])))
{
std::cerr << "Connect Error: " << errno << " " << strerror(errno) << std::endl;
}
Sock::SetNonBlock(sock);
std::string word;
std::string buffer;
while(true)
{
std::cout << "Please Enter The Word: ";
std::cin >> word;
word = Encode(word);
Send(sock, word);
// 非阻塞读取全部数据
bool ret = Recv(sock, buffer);
if(!ret) break; // 服务器关闭连接或者Recv出错了
std::string package = Decode(buffer);
std::cout << package << std::endl;
}
return 0;
}
👉总结👈
本篇博客主要讲解了什么是 Reactor 模式、Reactor 模式的组件、Reactor 模式的工作流程以及基于 Reactor 模式的 TCP 服务器等等。以上就是本篇博客的全部内容了,如果大家觉得有收获的话,可以点个三连支持一下!谢谢大家啦!💖💝❣️