目录
一、poll
二、epoll
1.epoll
2.epoll的函数接口
①epoll_create
②epoll_ctl
③epoll_wait
3.操作原理
三、epoll服务器编写
1.日志打印
2.TCP服务器
3.Epoll
①雏形
②InitEpollServer 与 RunServer
③HandlerEvent
四、Epoll的工作模式
1.LT模式与ET模式
2.基于LT模式的epoll服务器
①整体框架
②处理BUG
③优化结构
④异常处理
④序列化与反序列化
⑤优化
⑥Reactor模式与Proactor模式
一、poll
该函数的作用也是如同select一样在IO中负责等,但是它不用每次将参数重设,并且没有上限。
这需要参数的帮助。
第一个参数的类型是一个指针,其中存放的类型是一个结构体。
fd为文件描述符,events是负责告诉内核关注什么事件,revents是负责告诉用户关注的事件是否就绪。替代了select中输入输出参数为同一个的问题。我们还可以将参数稍作调整更为直观。
int poll(struct pollfd fds[],nfds_t nfds,int timeout);
我们要关注读事件写事件怎么传呢?
这些都是宏,我们想关注多个事件可以将他们对应的宏按位与在一起,再传入。
第二个参数的类型是nfds_t,实则是long int。
该参数传递的是前一个参数作为数组中的元素个数。
第三个参数,与select中的timeout不同的是 ,这里的timeout的参数不是结构体,直接表示的是微秒。
关于poll的服务器编写就不多阐述了,我们将上篇文章的select稍微修改下,就能使用。大家可以移步先去观看上篇文章。
代码:
#include <iostream>
#include <poll.h>
#include "Sock.hpp"
using namespace std;
#define NUM 1024
struct pollfd fdsArray[NUM]; // 辅助数组 里面存放历史文件描述符
#define DEFAUIT -1 // 默认
void Usage(string process)
{
cout << "Please entry" << process << " port" << endl;
}
static void ShowArray()
{
cout << "当前的文件描述符为: ";
for (int i = 0; i < NUM; i++)
{
if (fdsArray[i].fd == DEFAUIT)
continue;
cout << fdsArray[i].fd << ' ';
}
cout << endl;
}
static void HandlerEvent(int listensock)
{
for (int j = 0; j < NUM; j++)
{
if (fdsArray[j].fd == DEFAUIT)
continue;
if (j == 0 && fdsArray[j].fd == listensock)
{
if (fdsArray[j].revents & POLLIN)
{
cout << "新连接到来,需要处理" << endl;
string clientip;
uint16_t clientport = 0;
int sock = Sock::Accept(listensock, &clientip, &clientport); // 这里不会阻塞
if (sock < 0)
return;
cout << "获取新连接成功 " << clientip << ":" << clientport << " Sock:" << sock << endl;
int i = 0;
for (; i < NUM; i++)
{
if (fdsArray[i].fd == DEFAUIT)
break;
}
if (i == NUM)
{
cerr << "服务器已经到达了上限" << endl;
close(sock);
}
else
{
// 将文件描述符放入fdsArray中
fdsArray[i].fd = sock;
fdsArray[i].events = POLLIN;
fdsArray[i].revents = 0;
// debug
ShowArray();
}
}
}
else
{
// 处理其他的文件描述符的IO事件
if (fdsArray[j].revents & POLLIN)
{
char buffer[1024];
ssize_t s = recv(fdsArray[j].fd, buffer, sizeof(buffer), 0);
// 这里的阻塞读取真的会阻塞住吗?并不会,因为走到这里select已经帮我们等了,并且此时事件就绪。
if (s > 0)
{
buffer[s] = 0;
cout << "client[" << fdsArray[j].fd << "]"
<< " # " << buffer << endl;
}
else if (s == 0)
{
cout << "client[" << fdsArray[j].fd << "] "
<< "quit"
<< " server will close " << fdsArray[j].fd << endl;
fdsArray[j].fd = DEFAUIT; // 恢复默认
fdsArray[j].events = 0;
fdsArray[j].revents = 0;
close(fdsArray[j].fd); // 关闭sock
ShowArray(); // debug
}
else
{
cerr << "recv error" << endl;
fdsArray[j].fd = DEFAUIT; // 恢复默认
fdsArray[j].events = 0;
fdsArray[j].revents = 0;
close(fdsArray[j].fd); // 关闭sock
ShowArray(); // debug
}
}
}
}
}
int main(int argc, char **argv)
{
if (argc != 2)
{
Usage(argv[0]);
exit(1);
}
int listensocket = Sock::Socket();
Sock::Bind(listensocket, atoi(argv[1]));
Sock::Listen(listensocket);
for (int i = 0; i < NUM; i++)
{
fdsArray[i].fd = DEFAUIT;
fdsArray[i].events = 0;
fdsArray[i].revents = 0;
}
fdsArray[0].fd = listensocket; // 默认fdsArray第一个元素存放
fdsArray[0].events = POLLIN;
int timeout = 100000;
while (1)
{
int n = poll(fdsArray, NUM, timeout);
switch (n)
{
case 0:
// timeout
cout << "timeout ... : " << (unsigned int)time(nullptr) << endl;
break;
// error
cout << "select error : " << strerror(errno) << endl;
case -1:
break;
default:
// 等待成功
HandlerEvent(listensocket);
break;
}
}
}
现象:
二、epoll
1.epoll
按照man手册中说法为:是为处理大批量句柄而做了改进的poll。
epoll被公认为性能最好的多路I/O就绪通知方法。
2.epoll的函数接口
①epoll_create
该函数的目的是创建一个epoll句柄。
自从linux2.6.8之后,size参数是被忽略的。
返回值:成功时返回一个文件描述符,失败时返回-1。
②epoll_ctl
该函数的作用是为对epoll句柄中添加特点的文件描述符对应的事件。也就是用户告诉内核要关注哪些事件。
epfd:传入epoll_create的返回值。
op:选项,有EPOLL_CTL_ADD、 EPOLL_CTL_MOD、EPOLL_CTL_DEL。
fd:文件描述符。
event:对应的事件。
其中的结构体为:
③epoll_wait
作用是等待文件描述符对应的事件是否就绪。
epfd:传入epoll_create的返回值。
events:对应的事件。
maxevents:当前关注的文件描述符的最大值。
timeout:等待时间。
3.操作原理
了解了上文这么多函数,其实仅仅是看并没有真正理解到epoll是怎么工作的,下面来讲讲操作原理。
操作系统如何得知网络中的数据到来了?
网卡中得到数据,会向CPU发送硬件中断,调用OS预设的中断函数,负责从外设进行数据拷贝,从外设拷贝内核缓冲区。
epoll_create创建的epoll句柄是什么?
epoll句柄可以理解为epoll模型。
epoll_create会创建一个空的红黑树,一个就绪队列,创建对应的回调函数。
这里的红黑树中的节点存放着要关注的文件描述符和事件等信息,属于用户告诉内核。这里的树等价于当初写poll、select维护的数组。
就绪队列存放着已经就绪的事件。属于内核告诉用户。 回调函数是在当数据到来发生硬件中断,os调用中断函数中拷贝之后使用。
该回调函数会依据就绪的数据,获取到对应的文件描述符和对应的事件,依据这两个内容构建一个fd_queue节点,插入到就绪队列中。
这三个合起来为epoll模型。
这个epoll模型是调用epoll_create会创建的,但是为什么返回值是一个文件描述符呢?
我们知道文件描述符其实就是数组的下标,该数组存放着指向的管理文件的结构体的指针。具体的大家可以看我这篇文章。
struct file中就存放着epoll的数据结构。
由文件描述符找到文件的结构体,就可以找到红黑树,以及就绪队列等关于epoll的数据。
epoll_ctl则是来维护这个红黑树的,负责增加节点删除节点,也就是维护用户告诉内核信息的函数。
epoll_wait则是来从内核中的就绪队列拿数据,有数据则证明有事件就绪了,以前poll需要便利数组去查看是否有文件描述符对应的事件就绪,现在只用通过检查就绪队列是否有数据就知道是否有文件描述符对应的事件就绪。时间复杂度为O(1)。
三、epoll服务器编写
1.日志打印
Log.hpp:
#include <cstdio>
#include <cstdarg>
#include <cassert>
#include <stdlib.h>
#include <time.h>
#define DEBUG 0
#define NOTICE 1
#define WARNING 2
#define FATAL 3
const char *log_level[] = {"DEBUG", "NOTICE", "WARNING", "FATAL"};
void logMessage(int level, const char *format, ...)
{
assert(level >= DEBUG);
assert(level <= FATAL);
char logInfor[1024];
char *name = getenv("USER");
va_list ap;
va_start(ap, format);
vsnprintf(logInfor, sizeof(logInfor) - 1, format, ap);
va_end(ap);
FILE * out = (level == FATAL) ? stderr : stdout;
fprintf(out,"%s | %u | %s | %s\n",\
log_level[level],\
(unsigned int)time(nullptr),\
name == nullptr ? "Unkown" : name,\
logInfor
);
}
2.TCP服务器
Sock.hpp:
#pragma once
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <cstdio>
#include <cstring>
#include <signal.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <pthread.h>
#include <cerrno>
#include <cassert>
class Sock
{
public:
static const int gbacklog = 20;
static int Socket()
{
int listenSock = socket(PF_INET, SOCK_STREAM, 0);
if (listenSock < 0)
{
exit(1);
}
int opt = 1;
setsockopt(listenSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
return listenSock;
}
static void Bind(int socket, uint16_t port)
{
struct sockaddr_in local; // 用户栈
memset(&local, 0, sizeof local);
local.sin_family = PF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
// 2.2 本地socket信息,写入sock_对应的内核区域
if (bind(socket, (const struct sockaddr *)&local, sizeof local) < 0)
{
exit(2);
}
}
static void Listen(int socket)
{
if (listen(socket, gbacklog) < 0)
{
exit(3);
}
}
static int Accept(int socket, std::string *clientip, uint16_t *clientport)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int serviceSock = accept(socket, (struct sockaddr *)&peer, &len);
if (serviceSock < 0)
{
// 获取链接失败
return -1;
}
if (clientport)
*clientport = ntohs(peer.sin_port);
if (clientip)
*clientip = inet_ntoa(peer.sin_addr);
return serviceSock;
}
};
3.Epoll
①雏形
EpollServer.hpp:
#pragma once
#include <iostream>
#include <string>
#include <cstdlib>
#include "log.hpp"
using namespace std;
class EpollServer
{
public:
EpollServer(uint16_t port)
: port_(port), listensock_(-1), epfd_(-1)
{
}
void InitEpollServer()
{
}
void RunServer()
{
}
~EpollServer()
{
if(listensock_ != -1) close(listensock_);
if(epfd_ != -1) close(epfd_);
}
private:
int listensock_;
int epfd_;
uint16_t port_;
};
epoll.cc:
#include "EpollServer.hpp"
#include "Sock.hpp"
#include "Log.hpp"
#include <memory>
void Usage(string process)
{
cout << "Please entry" << process << " port" << endl;
}
int main(int argv, char **argc)
{
if (argv != 2)
{
Usage(argc[0]);
exit(1);
}
unique_ptr<EpollServer> epoll(new EpollServer(atoi(argc[1])));
epoll->InitEpollServer();
epoll->RunServer();
return 0;
}
②InitEpollServer 与 RunServer
代码:
void InitEpollServer()
{
listensock_ = Sock::Socket();
Sock::Bind(listensock_, port_);
Sock::Listen(listensock_);
epfd_ = epoll_create(gsize);
if (epfd_ < 0)
{
logMessage(FATAL, "%d:%s", errno, strerror(errno));
exit(1);
}
logMessage(DEBUG, "epoll_creatr success,epoll模型创建成功,epfd: %d", epfd_);
}
void RunServer()
{
// 1.先添加listensock_到epoll模型中
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = listensock_;
int a = epoll_ctl(epfd_, EPOLL_CTL_ADD, listensock_, &ev);
if (a != 0)
{
logMessage(FATAL, "%d:%s", errno, strerror(errno));
exit(1);
}
struct epoll_event revs[num];
int timeout = 10000;
while (1)
{
int n = epoll_wait(epfd_, revs, num, timeout);
switch (n)
{
case 0:
// timeout
cout << "timeout ... : " << (unsigned int)time(nullptr) << endl;
break;
// error
cout << "epoll_wait error : " << strerror(errno) << endl;
case -1:
break;
default:
// 等待成功
cout<<"event 到来"<<endl;
break;
}
}
}
现象:
③HandlerEvent
下面开始处理就绪事件。首先,我们定义一个成员变量它的类型是function<int(int)>,并初始化,
我们将自己想要对文件描述符处理的函数传入在类的实例化的过程中,并在HandlerEvent函数中调用该函数。
int myfuc(int sock)
{
// 这里bug,TCP基于流式,如何保证一次将数据读取完毕,一会解决
char buff[1024];
int sz = recv(sock, buff, sizeof buff -1, 0);
if (sz > 0)
{
buff[sz] = 0;
logMessage(DEBUG, "client[%d]:%s", sock, buff);
return sz;
}
return sz;
}
int main(int argv, char **argc)
{
if (argv != 2)
{
Usage(argc[0]);
exit(1);
}
unique_ptr<EpollServer> epoll(new EpollServer(atoi(argc[1]), myfuc));
epoll->InitEpollServer();
epoll->RunServer();
return 0;
}
void HandlerEvent(struct epoll_event *revs, int n)
{
for (int i = 0; i < n; i++)
{
int sock = revs[i].data.fd;
uint32_t revent = revs[i].events;
if (revent & EPOLLIN)
{
// IO
if (sock == listensock_)
{
// 监听套接字
string clientip;
uint16_t clientport = 0;
int iosock = Sock::Accept(listensock_, &clientip, &clientport);
if (iosock < 0)
{
logMessage(FATAL, "Sock error , errno : %d :%s", errno, strerror(errno));
continue;
}
// 托管给epoll
struct epoll_event ev;
ev.data.fd = iosock;
ev.events = EPOLLIN;
int a = epoll_ctl(epfd_, EPOLL_CTL_ADD, iosock, &ev);
assert(a == 0);
(void)a;
}
else
{
// 其他套接字
int n = fuc_(sock);
if (n == 0 || n < 0)
{
int x = epoll_ctl(epfd_, EPOLL_CTL_DEL, sock, nullptr);
assert(x == 0);
(void)x;
close(sock);
logMessage(DEBUG,"clinet[%d] exit",sock);
}
}
}
else
{
// epollout 后续处理
}
}
}
现象:
四、Epoll的工作模式
1.LT模式与ET模式
LT(level trigger)水平触发,ET(edge trigger)边缘触发。
LT模式通俗来讲,只要底层有数据,就一直通知上层。
ET模式,只要底层有数据且是从无到有,从有到多,发生变化时才会通知上层。
例如:你对妈妈说今天午饭LT模式,当妈妈将饭做好,等你吃饭时,并且一直在喊你,宝贝儿子快来吃饭,直到你去吃饭。这次你对妈妈说今天午饭ET模式,当妈妈将饭做好,饭从无到有,此时妈妈叫你吃饭,你没有去,此后饭的量恒定没有变化,妈妈就再也不会通知你。
这两个模式哪一个更为高效呢?ET模式更为高效。以TCP通信来讲,并没有多少数据的到来,底层却一直在提醒上层,多数的提醒的都是在提醒同一个数据就绪。
LT模式,只要有数据就会提醒,我们可以先去处理其他业务,等某次提醒时再处理底层的数据。但ET模式,数据到来之后没有变化,就再也不会提醒你,所以只能在提醒时就处理数据,要不后面再也没有提醒,这就倒逼程序员一次将数据读完。
如何知道底层数据已经读完了?只有不断的去调用recv、read函数,如果报错了证明数据已经读完了。数据已经读完了,但最后一次,没数据了,read却阻塞住了。
所以在ET模式下,所有的文件描述符都要设置成非阻塞。
epoll默认就是LT模式。
2.基于LT模式的epoll服务器
①整体框架
整体代码已经上传到gitee上,配合整体代码观看,更加直观便捷。
下面我们基于上文的epoll服务器,但不同与上文,大家请往下看。
首先先建立一个类,该类将文件描述符和回调方法结合。
Tcpserver.hpp:
class Connection;
using fuc_t = function<int(Connection *)>;
class Connection
{
public:
// 文件描述符
int _sock;
Tcpserver *_ptr;
// 自己的接受和发送缓冲区
string _inbuff;
string _outbuff;
// 回调函数
fuc_t _readfuc;
fuc_t _writefuc;
fuc_t _exceptfuc;
public:
Connection(int sock, Tcpserver *ptr) : _sock(sock), _ptr(ptr)
{
}
~Connection()
{
}
void SetReadfuc(fuc_t fuc)
{
_readfuc = fuc;
}
void SetWritefuc(fuc_t fuc)
{
_writefuc = fuc;
}
void SetExceptfuc(fuc_t fuc)
{
_exceptfuc = fuc;
}
};
Tcpserver.hpp:
class Tcpserver
{
public:
Tcpserver(int port)
{
// 网络
_listensock = Sock::Socket();
Sock::Bind(_listensock, port);
Sock::Listen(_listensock);
// epoll
_epfd = Epoller::CreateEpoll();
// add事件
Epoller::Addevent(_epfd, _listensock, EPOLLIN | EPOLLET);
// 将listensock匹配的connection方法添加到unordered_map中
auto iter = new Connection(_listensock, this);
iter->SetReadfuc(std::bind(&Tcpserver::Accepter, this, std::placeholders::_1));
_conn.insert({_listensock, iter});
// 初始化就绪队列
_revs = new struct epoll_event[_revs_num];
}
int Accepter(Connection *conn)
{
string clientip;
uint16_t clientport;
int sockfd = Sock::Accept(conn->_sock, &clientip, &clientport);
if (sockfd < 0)
{
logMessage(FATAL, "accept error");
return -1;
}
logMessage(DEBUG, "Get a new connect : %d", sockfd);
AddConn(sockfd, EPOLLIN | EPOLLET);
return 0;
}
bool SockinConn(int sock)
{
auto iter = _conn.find(sock);
if (iter == _conn.end())
{
return false;
}
else
{
return true;
}
}
void AddConn(int sock, uint32_t event)
{
// 将文件描述符加入epoll模型中
Epoller::Addevent(_epfd, sock, event);
// 将文件描述符匹配的connection,也加入map中
_conn.insert({sock, new Connection(sock, this)});
logMessage(DEBUG, "将文件描述符匹配的connection加入map成功");
}
void Dispatcher()
{
// 获取就绪事件
int n = Epoller::GetReadyFd(_epfd, _revs, _revs_num);
// logMessage(DEBUG, "GetReadyFd,epoll_wait");
// 事件派发
for (int i = 0; i < n; i++)
{
int sock = _revs[i].data.fd;
uint32_t revent = _revs[i].events;
if (EPOLLIN & revent)
{
// 先判空
if (SockinConn(sock) && _conn[sock]->_readfuc)
{
// 该文件描述符对应的读方法
_conn[sock]->_readfuc(_conn[sock]);
}
}
if (EPOLLOUT & revent)
{
// 先判空
if (SockinConn(sock) && _conn[sock]->_writefuc)
{
// 该文件描述符对应的写方法
_conn[sock]->_writefuc(_conn[sock]);
}
}
}
}
void Run()
{
while (1)
{
Dispatcher();
}
}
~Tcpserver()
{
if (_listensock != -1)
close(_listensock);
if (_epfd != -1)
close(_epfd);
delete[] _revs;
}
private:
// 1.网络sock
int _listensock;
// 2.epoll
int _epfd;
// 3.将epoll与上层代码结合
unordered_map<int, Connection *> _conn;
// 4.就绪事件列表
struct epoll_event *_revs;
// 5.就绪事件列表大小
static const int _revs_num = 64;
};
Epoller.hpp:
#pragma once
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstring>
#include <sys/epoll.h>
#include "Log.hpp"
using namespace std;
class Epoller
{
public:
static int CreateEpoll()
{
int size = 128;
int epfd = epoll_create(size);
if (epfd < 0)
{
logMessage(FATAL, "%d:%s", errno, strerror(errno));
exit(1);
}
return epfd;
}
static bool Addevent(int epfd, int sock, uint32_t event)
{
struct epoll_event ev;
ev.data.fd = sock;
ev.events = event;
int n = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev);
if (n != 0)
{
logMessage(FATAL, "%d:%s", errno, strerror(errno));
return false;
}
return true;
}
static int GetReadyFd(int epfd, struct epoll_event evs[], int num)
{
// 阻塞式
int n = epoll_wait(epfd, evs, num, -1);
if (n == -1)
{
logMessage(FATAL, "%d:%s", errno, strerror(errno));
}
return n;
}
};
Epoll.cc:
#include "Tcpserver.hpp"
#include "Sock.hpp"
#include <memory>
using namespace std;
void Usage(string process)
{
cout << "Please entry" << process << " port" << endl;
}
int main(int argv, char **argc)
{
if (argv != 2)
{
Usage(argc[0]);
exit(1);
}
unique_ptr<Tcpserver> ep(new Tcpserver(atoi(argc[1])));
ep->Run();
return 0;
}
现象:
当然还没有写完。
结合整体代码看:①通过Tcpserver的构造函数,先创建网络套接字,建立epoll模型②监听listen套接字,并为listen设置对应的connect类设置读方法③将listen套接字添加到epoll模型中④通过epoll_wait等待事件就绪⑤当listen套接字就绪时,调用回调函数其中accept来获取新连接的到来。
同时,因为我们今天写的是ET模式,所以要将文件描述符设置为非阻塞式,即使用fcntl。
目前代码只写到了这里,想纵观全貌更加的详细请看我的gitee。
下面继续增加内容,新的连接到来,要为新的连接增加对应的方法。
并且要进行序列化与反序列化,因为我们的服务器是基于TCP的流式读取,每次读取我们确保不了读上来的数据是完整的数据,所以要做处理。
当前的处理为我们读上的数据如同:112233X1213Xadasd。‘X’作为分隔符,我们需要读上来的数据,将数据进行分割。
同样建立一个回调方法,在read之后对数据进行分割通过分割,在此之前对它进行初始化。
using callbcak_t = function<int(Connection *, string &)>;
int HandlerPro(Connection *conn, string &message)
{
// 我们能保证走到这里一定是完整的报文,已经解码
// 接下来是反序列化
cout << "获取request : " << message <<"剩余的信息是"<<conn->_inbuff<<endl;
}
int main(int argv, char **argc)
{
if (argv != 2)
{
Usage(argc[0]);
exit(1);
}
unique_ptr<Tcpserver> ep(new Tcpserver(HandlerPro, atoi(argc[1])));
ep->Run();
return 0;
}
int TcpRecver(Connection *conn)
{
// 对普通套接字读取
while (true)
{
char buff[1024];
ssize_t sz = recv(conn->_sock, buff, sizeof(buff) - 1, 0);
if (sz > 0)
{
buff[sz] = '\0';
conn->_inbuff += buff;
}
else if (sz == 0)
{
logMessage(DEBUG, "client quit");
}
else if (sz < 0)
{
if (errno == EINTR)
{
// 因为信号导致IO关闭,但数据还没有读完
continue;
}
else if (errno == EAGAIN || errno == EWOULDBLOCK)
{
// 读完了
break;
}
else
{
// 读取出错
}
}
}
// 本轮读取完毕
// 将读取上来的 如:xxxxx/3xxxxx/3xxx/3
// 分为 xxxxx 、xxxxx、xxx
vector<string> result;
PackageSplit(conn->_inbuff, &result);
for (auto &message : result)
{
_cb(conn, message);
}
return 0;
}
现象:
②处理BUG
我们要解决Accept函数的BUG,因为我们当前是ET模式,如果当前有大量的连接来,系统只会通知上层一次,而只进行一次调用显然是不对的,会导致读不上其他到来的链接。所以此时我们要进行循环读取,那循环读取时什么时候停止呢?要根据Accept函数中的accept函数调用失败时设置的errno来判别。我们来看下究竟errno会被设置成什么。
EAGAIN和EWOULDBLOCK,意思为,当前的文件描述符被置为非阻塞的且当前没有可接受的连接。意味着已经将当前到来的连接读完了。
EINTR表示当前的系统调用被一个捕捉到的信号中断在一个有效的连接到来之前。
这三个宏值得我们关注,剩下的宏都是表明accept出错了,所以我们可以这样修改函数。
int Accepter(Connection *conn)
{
while (1)
{
string clientip;
uint16_t clientport;
int sockfd = Sock::Accept(conn->_sock, &clientip, &clientport);
if (sockfd < 0)
{
if (errno == EINTR) // 被信号中断
continue;
else if (errno == EAGAIN || errno == EWOULDBLOCK) // 读取结束
break;
else
{
// 出错
logMessage(FATAL, "accept error");
return -1;
}
}
logMessage(DEBUG, "Get a new connect : %d", sockfd);
AddConn(sockfd, EPOLLIN | EPOLLET);
}
return 0;
}
③优化结构
我们会发现被标注的代码,与下文的AddConn函数功能具有相似性,为了避免耦合性更高,我们将这段代码移到AddConn函数中。
void AddConn(int sock, uint32_t event, fuc_t readfuc, fuc_t writefuc, fuc_t exceptfuc)
{
if (event & EPOLLET)
Util::SetNonBlock(sock);
// 将文件描述符加入epoll模型中
Epoller::Addevent(_epfd, sock, event);
// 将文件描述符匹配的connection,也加入map中
Connection *conn = new Connection(sock, this);
conn->SetReadfuc(readfuc);
conn->SetWritefuc(writefuc);
conn->SetExceptfuc(exceptfuc);
// conn->SetReadfuc(std::bind(&Tcpserver::TcpRecver, this, std::placeholders::_1));
// conn->SetWritefuc(std::bind(&Tcpserver::TcpSender, this, std::placeholders::_1));
// conn->SetExceptfuc(std::bind(&Tcpserver::TcpExcepter, this, std::placeholders::_1));
_conn.insert({sock, conn});
logMessage(DEBUG, "将文件描述符匹配的connection加入map成功");
}
改了AddConn函数,还需对其他涉及到此函数的地方大动干戈。
Tcpserver(callbcak_t cb, int port) : _cb(cb)
{
// 网络
_listensock = Sock::Socket();
Util::SetNonBlock(_listensock);
Sock::Bind(_listensock, port);
Sock::Listen(_listensock);
// epoll
_epfd = Epoller::CreateEpoll();
// 添加listen事件
AddConn(_listensock, EPOLLIN | EPOLLET,
std::bind(&Tcpserver::Accepter, this, std::placeholders::_1), nullptr, nullptr);
// // add事件
// Epoller::Addevent(_epfd, _listensock, EPOLLIN | EPOLLET);
// // 将listensock匹配的connection方法添加到unordered_map中
// auto iter = new Connection(_listensock, this);
// iter->SetReadfuc(std::bind(&Tcpserver::Accepter, this, std::placeholders::_1));
// _conn.insert({_listensock, iter});
// 初始化就绪队列
_revs = new struct epoll_event[_revs_num];
}
int Accepter(Connection *conn)
{
while (1)
{
string clientip;
uint16_t clientport;
int sockfd = Sock::Accept(conn->_sock, &clientip, &clientport);
if (sockfd < 0)
{
if (errno == EINTR) // 被信号中断
continue;
else if (errno == EAGAIN || errno == EWOULDBLOCK) // 读取结束
break;
else
{
// 出错
logMessage(FATAL, "accept error");
return -1;
}
}
logMessage(DEBUG, "Get a new connect : %d", sockfd);
AddConn(sockfd, EPOLLIN | EPOLLET,
std::bind(&Tcpserver::TcpRecver, this, std::placeholders::_1),
std::bind(&Tcpserver::TcpSender, this, std::placeholders::_1),
std::bind(&Tcpserver::TcpExcepter, this, std::placeholders::_1));
}
return 0;
}
④异常处理
就绪的文件描述符挂断了,就绪的文件描述符出错了怎么办。我们统一起来转为文件描述符
读事件或写事件就绪,必定在读时或写时出错,我们转而去那时处理异常。
接下来,我们呢要对异常处理函数编写。
代码:
int TcpExcepter(Connection *conn)
{
// 处理普通套接字异常
// 0.检测
if (!SockinConn(conn->_sock))
return -1;
// 1.移除事件
Epoller::DelEvent(_epfd, conn->_sock);
logMessage(DEBUG, "remove epoll event");
// 2.关闭文件描述符
close(conn->_sock);
logMessage(DEBUG, "close fd :%d", conn->_sock);
// 3.删除map中的sock对应的conn
// delete conn;
delete _conn[conn->_sock];
logMessage(DEBUG, "delete conn object success");
// 4.去掉sock和conn的映射关系 上一步只是delete掉了对象,但是映射关系还在
_conn.erase(conn->_sock);
logMessage(DEBUG, "erase conn from map success");
}
现象:
④序列化与反序列化
何为序列化?何为反序列化?
我们在网络传输时以字符串形式发送,传输时以二进制形式。那我想要给对方发送结构体怎么办,则需要将结构体转换为字符串形式,这个过程称为序列化;对方收到字符串,通过反序列化就可以得到结构体。
我们今天的结构体如图所示:
struct Request
{
int _x;
int _y;
char _op;
};
struct Response
{
int _result;
int _exitcode;
};
客户端发送一串字符串我们接受到之后,通过反序列化转换为结构体。
bool Parser(string &in, Request *out)
{
// 反序列化
// 1 + 1, 2 * 4, 5 * 9, 6 *1
std::size_t spaceOne = in.find(SPACE);
if (std::string::npos == spaceOne)
return false;
std::size_t spaceTwo = in.rfind(SPACE);
if (std::string::npos == spaceTwo)
return false;
std::string dataOne = in.substr(0, spaceOne);
std::string dataTwo = in.substr(spaceTwo + SPACE_LEN);
std::string oper = in.substr(spaceOne + SPACE_LEN, spaceTwo - (spaceOne + SPACE_LEN));
if (oper.size() != 1)
return false;
// 转成内部成员
out->_x = atoi(dataOne.c_str());
out->_y = atoi(dataTwo.c_str());
out->_op = oper[0];
return true;
}
void Serialize(Response &in, string *out)
{
// 序列化
// "exitCode_ result_"
std::string ec = std::to_string(in._exitcode);
std::string res = std::to_string(in._result);
*out = ec;
*out += SPACE;
*out += res;
*out += CRLF;
}
接下来,去文件描述符对应的读方法,调用该函数。
Response Calculate(Request &req)
{
Response resp = {0,0};
switch (req._op)
{
case '+':
resp._result = req._x + req._y;
break;
case '-':
resp._result = req._x - req._y;
break;
case '*':
resp._result = req._x * req._y;
break;
case '/':
{
if (req._y == 0)
{
resp._exitcode = 1; // 1 除零错误
resp._result = INT32_MAX;
}
else
resp._result = req._x / req._y;
break;
}
case '%':
{
if (req._y == 0)
{
resp._exitcode = 2; // 2 模零错误
resp._result = INT32_MAX;
}
else
resp._result = req._x % req._y;
break;
}
default:
resp._exitcode = 3; // 非法输入
break;
}
return resp;
}
int HandlerPro(Connection *conn, string &message)
{
// 我们能保证走到这里一定是完整的报文,已经解码
cout << "获取request : " << message << endl;
// 1 * 1
// 接下来是反序列化
Request req;
if (Parser(message, &req) == false)
{
return -1;
}
// 业务处理
Response resp = Calculate(req);
// 序列化
string out;
Serialize(resp, &out);
// 发送给client
conn->_outbuff += out;
// 发送
}
能不能直接调用send方法无脑的直接向客户端发送呢?
首先要知道写的缓冲区是否已满,如何检测缓冲区已满?
在LT模式中,当我们想写时只需要将对应的文件描述符所对应的EPOLLOUT添加事件中,在我们今天的编写代码中,在whlie循环中的事件检测中,当检测到是EPOLLOUT事件时,我们就可以在回调函数中调用send函数。
在ET模式中,我们也可以使用上面的方法,但是ET模式追求高效,所以一般会直接发送数据,如果数据发送完了,那就可以结束了;如果没有发送完,缓冲区已满,就会选择去拜托EPOLL去完成后续任务。
int HandlerPro(Connection *conn, string &message)
{
// 我们能保证走到这里一定是完整的报文,已经解码
cout << "---------------" << endl;
// 1 * 1
// 接下来是反序列化
cout << "获取request : " << message << endl;
Request req;
if (Parser(message, &req) == false)
{
return -1;
}
// 业务处理
Response resp = Calculate(req);
// 序列化
string out;
Serialize(resp, &out);
// 发送给client
conn->_outbuff += out;
conn->_writefuc(conn);
if (conn->_outbuff.empty())
{
if (conn->_outbuff.empty() == 0)
conn->_ptr->ModSockEvent(conn->_sock, true, false);
else
conn->_ptr->ModSockEvent(conn->_sock, true, true);
}
// // 发送
// // conn->_ptr->ModSockEvent(conn->_sock, true, true);
cout << "---------------" << endl;
}
void ModSockEvent(int sock, bool read, bool write)
{
uint32_t event = 0;
event |= read ? EPOLLIN : 0;
event |= write ? EPOLLOUT : 0;
Epoller::ModEvent(_epfd, sock, event);
}
static bool ModEvent(int epfd, int sock, uint32_t event)
{
struct epoll_event ev;
ev.data.fd = sock;
ev.events = event;
int n = epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ev);
return n == 0;
}
因为,我们截取字符串的函数没有找到我们规定的分隔符的话,就会将最后的一部字符串归到下一次读取时。原来的代码如下所示,所以就会导致每次输入一段字符串,只会输出最后一段的结果,我们只需要稍作修改。
void PackageSplit(string &buff, vector<string> *result)
{
// asdasXdasdaXda
// asdas dasda da
while (true)
{
size_t pos = buff.find(SEP);
if (pos == string::npos)
{
break;
}
result->push_back(buff.substr(0, pos));
buff.erase(0, pos + SEP_SZ);
}
}
修改后:
void PackageSplit(string &buff, vector<string> *result)
{
// asdasXdasdaXda
// asdas dasda da
while (true)
{
size_t pos = buff.find(SEP);
if (pos == string::npos)
{
if (buff.size() < 5)
{
buff.clear();
break;
}
result->push_back(buff.substr(0, buff.size()));
buff.clear();
break;
}
result->push_back(buff.substr(0, pos));
buff.erase(0, pos + SEP_SZ);
}
}
现象:
⑤优化
我们的代码耦合度太高了,数据处理的函数放在了源文件中,我们另起一个头文件,将处理函数放到该头文件中,这样业务处理是业务处理,网络服务是网络服务。
Service.hpp:
#pragma once
#include "Protocol.hpp"
#include <functional>
using service_t = function<Response (Request &req)>;
Response Calculate(Request &req)
{
Response resp = {0, 0};
switch (req._op)
{
case '+':
resp._result = req._x + req._y;
break;
case '-':
resp._result = req._x - req._y;
break;
case '*':
resp._result = req._x * req._y;
break;
case '/':
{
if (req._y == 0)
{
resp._exitcode = 1; // 1 除零错误
resp._result = INT32_MAX;
}
else
resp._result = req._x / req._y;
break;
}
case '%':
{
if (req._y == 0)
{
resp._exitcode = 2; // 2 模零错误
resp._result = INT32_MAX;
}
else
resp._result = req._x % req._y;
break;
}
default:
resp._exitcode = 3; // 非法输入
break;
}
return resp;
}
epoll.cc:
int HandlerProHelp(Connection *conn, string &message,service_t service)
{
// 我们能保证走到这里一定是完整的报文,已经解码
cout << "---------------" << endl;
// 1 * 1
// 接下来是反序列化
cout << "获取request : " << message << endl;
Request req;
if (Parser(message, &req) == false)
{
return -1;
}
// 业务处理
Response resp = service(req);
// 序列化
string out;
Serialize(resp, &out);
// 发送给client
conn->_outbuff += out;
conn->_writefuc(conn);
if (conn->_outbuff.empty())
{
if (conn->_outbuff.empty() == 0)
conn->_ptr->ModSockEvent(conn->_sock, true, false);
else
conn->_ptr->ModSockEvent(conn->_sock, true, true);
}
// // 发送
// // conn->_ptr->ModSockEvent(conn->_sock, true, true);
cout << "---------------" << endl;
return 0;
}
int HandlerPro(Connection *conn, string &message)
{
return HandlerProHelp(conn,message,Calculate);
}
这样在想处理其他业务的时候,只需要将业务处理函数放入Service.hpp中,然后将源文件中调用就行。
⑥Reactor模式与Proactor模式
我们今天使用的是Reactor模式。
Reactor模式:
Linux系统中最常用的反应器模式。
半同步半异步。
即负责事件的派发,有否则IO,或者说业务处理。
Proactor模式:
只负责事件的派发,就绪的事件推送给后台的进程、线程池,不关心处理的细节。
到这里,epoll服务器的编写,应该就告一段落了,尽管还有数不清的BUG,和没有说清楚的知识点,但是总体还是挺完美的,那么感谢观看,我们下次再见。