epoll
- 前言
- 正式开始
- epoll相关的接口
- epoll_create
- epoll_ctl
- epoll_wait
- epoll原理
- 硬件上的数据是怎么交给上层的
- 创建epoll模型
- epoll模型中的红黑树
- epoll中的就绪队列
- 回调方法
- 前面三个接口在模型中的体现
- 一些细节
- 编写epoll服务器
- 小组件
- 正式开始编写
- 对epoll接口进行封装
- epoll_create
- epoll_ctl
- epoll_wait
- 用用刚刚封装的接口
- 小总结
- epoll的优点(和 select 的缺点对应)
- epoll模式
- 小细节
- 编写ET模式服务器(Reactor服务器)
- 完整过程
- 完整代码(做过修改的和新的)
前言
本篇基于前一篇多路转接进行讲解,如果你对于多路转接不太了解,请先看前一篇:
【网络】五中IO模型介绍 + 多路转接中select和poll服务器的简单编写
本篇讲解Epoll,主要内容有:
- 三个Epoll接口
- Epoll原理
- 编写Epoll的LT模式服务器
- 编写Epoll的ET模式服务器
正式开始
官方的说法:epoll是为了处理大量句柄而做了改进的poll。
但改进版的poll都不像poll了,epoll和poll的差别还是很大的。
这句话里面有个名词需要介绍一下。句柄是啥?
百度百科里面是这样说的:句柄(Handle)是一个用来标识对象或者项目的标识符,可以用来描述窗体、文件等,值得注意的是句柄不能是常量。
比如说C语言中文件的FILE*,或者是文件描述付fd等这种能够标定特定文件资源的特定数据对象,就可称其为句柄,是一种统称。
它是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44)
Linux2.6下性能最好的多路它几乎具备了之前所说的一切优点,被公认为性能最好的多路转接方法。
想看你内核是多少的话,可以用uname -a命令:
我这里的版本是3.几的,虽然现在内核版本已经更新到5.几了。
现在能接触到的Linux操作系统基本全部支持epoll。
epoll几乎具备了前一篇中select和poll的优点,且缺点也是几乎全部解决,所以epoll是现在主流服务器中性能最好,被人们用的最多的多路转接方案。
epoll相关的接口
select和poll只有一个接口,二者解决的问题就是IO中的等(我前一篇中说了IO = 等 + 拷贝),epoll核心工作也是等,只不过为了更好的等,设计了三个接口,分别是epoll_create、epoll_ctl、epoll_wait,那么我这里先简单介绍一下这三个接口,这里简单介绍就是带各位过一下这三个接口,真正去理解这些接口是我等会讲epoll原理的时候。
epoll_create
这个函数作用是创建一个epoll模型,其实返回的就是一个文件描述符,关于epoll模型的文件描述符。关于epoll模型是啥东西得等后面讲epoll原理的时候再细说,现在只要知道用epoll前得先调用epoll_create创建一个epoll模型就行。
size参数一般写成512或者256就行了,其实这个参数目前已经废弃了,man手册上是这样说的:
Since Linux 2.6.8, the size argument is ignored, but must be greater than zero
没有把这个参数去掉是为了兼容性,因为以前的老服务器都用了这个参数,不过是现在不用了,如果去掉,那老的服务器就完犊子了。
epoll_ctl
这个接口是对epoll模型进行某些操作用的。
epfd就是刚刚epoll_create创建出来的epoll模型的fd。
op代表的是你想对该epoll模型做什么操作(增删改什么的),比如说可能向epoll模型中添加一个特定的文件描述符(第三个参数fd)上对于特定事件(第四个参数event)的关心。
ok,点到为止,等会原理再细讲。
epoll_wait
这个是在epfd中获取已经就绪文件描述符上的事件。
timeout和poll中的timeout一毛一样。1000就是1s超时,0就是非阻塞,-1就是阻塞。
返回值为已经就绪的文件描述符的个数。
三个简单介绍完了,相信初学epoll的同学一定很懵,没关系,刚刚都说了只是简单过一下,原理细讲这三个。下面就来讲讲epoll的工作原理来帮助理解一下。
epoll原理
先简单回忆一下前一篇中讲的select和poll。
二者都是需要用户自己维护一个数组来保存fd与特定的事件的。
二者都要对数组进行遍历
二者工作模式:
- 通过select或poll,用户告诉内核,你要帮我关心哪些fd上的哪些事件。
- 通过select或poll的输出型参数和返回值,内核告诉用户,哪些fd上的事件已经就绪。
再来说epoll。
硬件上的数据是怎么交给上层的
比如说os是怎么知道网卡里面有数据或键盘上有用户输入了呢?
看图:
能不能让os定期的去查看一下硬件是否有数据了?
不太行,os挺忙的,去查之前可能有自己的事情干,查之后也可能有事情干,这就意味着这种方法os处理硬件的数据一定是来不及的,所以不行。
实际上是采用硬件中断的方式,网卡一旦有数据了,就会立马通过硬件电路给CPU特定针脚发送一个中断,CPU识别到有中断过来了就立马将在其上面运行的进程剥离下来,并立马切换到内核状态。
os内部有一个中断向量表:
是一个函数指针数组,这个表里面存放有很多驱动方法,针脚是对应有一个序号的,当某个针脚收到传来的电脉冲时会被转化成CPU某个寄存器中的值,假如说这里就是8号帧脚,就会转换成数字8保存到寄存器中,根据寄存器中的值去索引这张表去调用驱动方法(假如说现在的方法就是读取网卡数据),从而把数据搬到os内部,这样就完成了将硬件上的数据搬到软件上,这些接口就是由驱动程序提供的,但是os中的数据还不能直接交给上层,还要经过协议栈来交付。
epoll和这有什么关系呢?有一点,但不多。
创建epoll模型
epoll模型中的红黑树
当我们创建了一个epoll模型后,os会为我们维护一棵红黑树,关于红黑树我前面的博客中是有的,不懂的同学可以先看看:【C++】红黑树模拟实现插入功能(包含旋转和变色) 。
假如说是这样的一棵(当然最开始的时候是一棵空树):
树节点维护的有很多字段,但是最重要的就两个:fd(int类型)和event(uint32_t类型,也就是32位),分别代表你让os关心的文件描述符上的事件。上层不断让os关注新的fd上的事件,os也就会不断添加红黑树节点,所以这棵树解决的是用户告诉内核要让内核关心哪些fd上的事件的问题,也就相当于select / poll中维护的数组,今天有了epoll就不用我们自己再维护数组了,os会自动帮我们维护。
epoll中的就绪队列
os帮我们维护的第二种结构,就绪队列(双向链表)。
开始的时候也是空的,每个链表中的节点最重要的字段为fd和revents,也就是fd上的就绪事件。当有fd的事件就绪时os就会创建一个节点放入到队列中,我们上层想知道哪些fd上的事件就绪时只需要监测一下就绪队列是否为空就行,这样就可以以O(1)的时间复杂度拿到就绪的fd上的事件,所以这个队列解决的就是内核告诉用户哪些fd上的哪些事件已经就绪了的问题。
回调方法
os会在底层驱动中搞一个回调方法,假设为void callback()函数,其做法就是当下层有数据传来(某事件就绪)的时候,对比红黑树中的节点(用户让os关心的事件),结合下层传来的事件,若关心事件与已经发生的时间匹配成功,就会构建一个就绪节点插入到就绪队列的队尾。如果发生的事件为读但是关心的事件为写,那么就不会构建就绪节点。
因为采用的是回调方法,所以就不需要像select或poll那样让os对每一个文件描述符进行频繁的遍历了。
上面这一整套机制就是epoll模型。
其实上面的红黑树和就绪队列是有特定数据结构将二者关联起来了的,这个结构就是eventpoll结构体,这个结构体中就有红黑树的根节点和就绪队列的头节点:
还有一个结构体可以既能表示红黑树中的一个节点,又可以表示就绪队列中的一个节点:
前面三个接口在模型中的体现
比如调用epoll_create就是在构建红黑树的根节点、就绪队列的头、回调机制等。
调用epoll_ctl就是增加、删除、修改红黑树中的节点。
调用epoll_wait就是在特定的epoll模型中(epfd)捞取就绪队列中就绪的事件,再看一下这个函数:
events和maxevents就是和就绪队列有关的东西。
那前面一直讲的epfd有什么用呢?
一个文件描述符,对应的struct file中有一个字段,该字段的类型就是eventpoll*的,根据这个指针就能找到eventpoll对象,然后就能对其中的红黑树和就绪队列做相关的操作了。
再来对比一下epoll模型和刚刚说的select和poll中的3个点:
epoll不用用户自己维护数组,而是os维护一个红黑树。
epoll不需要用户自己对数组遍历,而是os自行在红黑树中找。
epoll通过红黑树来实现用户告诉内核该关心什么,通过就绪队列来实现内核告诉用户哪些事件就绪。
一些细节
细节一
我前面讲红黑树的时候也说过的,红黑树每个节点都是要有一个key值的,这里epoll中的红黑树中fd就是一个天然的key值。
细节二
用户只需要设置关心和获取结果就绪,不用再关系任何对fd与event的管理细节。
细节三
epoll为什么高效呢?
a. 文件描述符管理起来更便捷(红黑树)
b. 监测事件更便捷,不需要再遍历所有的文件描述符。
c. 获取就绪文件描述符更方便,调用epoll_wait直接从下层的就绪队列中拿就行了。
细节四
底层只要有fd就绪了,os自己回给我们构建节点并连入到就绪队列中,上层只需要不断从就绪队列中将数据拿走,就完成了获取就绪事件的任务,拿走后os还可以继续往就绪队列中放就绪事件对应的节点。
一边拿一边放,是不是就是一个生产消费者模型?
答案是是的。想一想,上层用户和os一个拿一个放,会不会同时访问到这个就绪队列?
会。所以说这里的就绪队列就可以看做是一个临界资源。那么会不会出现线程安全问题?
不会,因为epoll在实现的时候已经考虑到这一点了,其内部的实现细节已经保证所有的epoll接口都是线程安全的,所以不必担心线程安全问题。
细节五
如果底层没有就绪事件,我们上层是只能阻塞等待的,这就是为啥epoll_wait参数中还有一个timeout。
编写epoll服务器
逻辑和前面的select和epoll还是很相似的,不过写法上更简单一点,前一篇博客中select服务器把该讲的细节都讲了的,等会我写的时候就不再讲那么些细节了,如果说碰到了新的东西我再细说。
小组件
前面写的小组件:
Sock.hpp(对socket相关接口的封装)
#pragma once
#include "LogMessage.hpp"
#include <iostream>
#include <string>
#include <memory>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
// 对套接字相关的接口进行封装
class Sock
{
private:
static const int gBackLog = 20;
public:
// 1. 创建套接字
static int Socket()
{
/*先AF_INET确定网络通信*/ /*这里用的是TCP,所以用SOCK_STREAM*/
int listenSock = socket(AF_INET, SOCK_STREAM, 0);
// 创建失败返回-1
if(listenSock == -1)
{
LogMessage(FATAL, _F, _L, "server create socket fail");
exit(2);
}
LogMessage(DEBUG, _F, _L, "server create socket success, listen sock::%d", listenSock);
// 创建成功
return listenSock;
}
// 2. bind 绑定IP和port
static void Bind(int listenSock, uint16_t port, const std::string& ip = "0.0.0.0")
{
sockaddr_in local; // 各个字段填充
memset(&local, 0, sizeof(local));
// 若为空字符串就绑定当前主机所有IP
local.sin_addr.s_addr = inet_addr(ip.c_str());
local.sin_port = htons(port);
local.sin_family = AF_INET;
/*填充好了绑定*/
if(bind(listenSock, reinterpret_cast<sockaddr*>(&local), sizeof(local)) < 0)
{
LogMessage(FATAL, _F, _L, "server bind IP+port fail :: %d:%s", errno, strerror(errno));
exit(3);
}
LogMessage(DEBUG, _F, _L, "server bind IP+port success");
}
// 3. listen为套接字设置监听状态
static void Listen(int listenSock)
{
if(listen(listenSock, gBackLog/*后面再详谈listen第二个参数*/) < 0)
{
LogMessage(FATAL, _F, _L, "srever listen fail");
exit(4);
}
LogMessage(NORMAL, _F, _L, "server init success");
}
// 4.accept接收连接 输出型参数,返回客户端的IP + port
static int Accept(int listenSock, std::string &clientIp, uint16_t &clientPort)
{
/*客户端相关字段*/
sockaddr_in clientMessage;
socklen_t clientLen = sizeof(clientMessage);
memset(&clientMessage, 0, clientLen);
// 接收连接
int serverSock = accept(listenSock, reinterpret_cast<sockaddr*>(&clientMessage), &clientLen);
// 对端的IP和port信息
clientIp = inet_ntoa(clientMessage.sin_addr);
clientPort = ntohs(clientMessage.sin_port);
if(serverSock < 0)
{
// 这里没连接上不能说直接退出,就像张三没有揽到某个客人餐馆就不干了,所以日志等级为ERROR
LogMessage(ERROR, _F, _L, "server accept connection fail");
return -1;
}
else
{
LogMessage(NORMAL, _F, _L, "server accept connection success ::[%s:%d] server sock::%d", \
clientIp.c_str(), clientPort,serverSock);
}
return serverSock;
}
};
LogMessage.hpp(打印日志的)
#pragma once
#include <cstdio>
#include <cstring>
#include <ctime>
#include <cstdarg>
#include <unistd.h>
#include <vector>
// 文件名
#define _F __FILE__
// 所在行
#define _L __LINE__
enum level
{
DEBUG, // 0
NORMAL, // 1
WARING, // 2
ERROR, // 3
FATAL // 4
};
std::vector<const char*> gLevelMap = {
"DEBUG",
"NORMAL",
"WARING",
"ERROR",
"FATAL"
};
#define FILE_NAME "./log.txt"
void LogMessage(int level, const char* file, int line, const char* format, ...)
{
#ifdef NO_DEBUG
if(level == DEBUG) return;
#endif
// 固定格式
char FixBuffer[512];
time_t tm = time(nullptr);
// 日志级别 时间 哪一个文件 哪一行
snprintf(FixBuffer, sizeof(FixBuffer), \
"<%s>==[file->%s] [line->%d] ----------------------------------- time:: %s", gLevelMap[level], file, line, ctime(&tm));
// 用户自定义格式
char DefBuffer[512];
va_list args; // 定义一个可变参数
va_start(args, format); // 用format初始化可变参数
vsnprintf(DefBuffer, sizeof DefBuffer, format, args); // 将可变参数格式化打印到DefBuffer中
va_end(args); // 销毁可变参数
// 往显示器打
printf("%s\t=\n\t=> %s\n\n\n", FixBuffer, DefBuffer);
// 往文件中打
// FILE* pf = fopen(FILE_NAME, "a");
// fprintf(pf, "%s\t==> %s\n\n\n", FixBuffer, DefBuffer);
// fclose(pf);
}
正式开始编写
首先就是三步:
此时设置好listen状态了,不能直接在Start启动服务器中直接调用accept,因为会导致进程阻塞,所以得先将_listenSock添加到Epoll底层的红黑树中。但是此时底层的红黑树、就绪队列等还没创建所以得先调用epoll_create创建epoll模型,然后再将_listenSock添加到Epoll模型的红黑树中。不过我这里不直接调用epoll_create,给epoll相关接口也封装一下。
对epoll接口进行封装
epoll_create
参数size随便给一个大于零的数就行
返回值是失败了返回-1,成功了返回epoll模型对应的文件描述符epfd。
封装:
epoll_ctl
epfd就是epoll_create的返回值。
op有三个:
EPOLL_CTL_ADD
EPOLL_CTL_MOD
EPOLL_CTL_DEL分别是对epoll模型的红黑树中添加节点,修改某个节点,删除某个节点
fd就是你要添加到红黑树节点中的文件描述符,event就是fd对应节点要关心的事件
说一下epoll_event这个类型:
其中epoll_event中的events就和前面poll参数中的event一样,不过是32位的,可以是下面几个宏的集合:
EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);EPOLLOUT : 表示对应的文件描述符可以写;
EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
EPOLLERR : 表示对应的文件描述符发生错误;
EPOLLHUP : 表示对应的文件描述符被挂断;
EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的.
EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里HUP在不同的平台下稍微有点BUG。ONSHOT用起来意思就是关心一次后就将节点从红黑树中删除。
.
epoll_event中的data是一个联合体对象,其中的四个元素用哪个都行,等会我就用其中的fd。
返回值:成功0,失败-1并设置错误码。
封装:
epoll_wait
epfd还是上面的那个。
events是指你要传一个数组,调用结束后若有就绪事件就会将就绪事件放到你传入的数组中。
maxevents是指你传入的数组最大元素个数,如果说底层就绪的事件个数比你传入的数组元素个数要多,没关系,能取多少取多少,下次调用的时候还能续着上次的继续来。
timeout就是超时时间。
返回值若大于零则指就绪事件的个数,等于0就是超时了,-1表示出错并设置错误码。
封装:
一个小细节,epoll_wait返回值是很有用的,不仅是有几个就绪时间就返回几,select和poll我们自己维护的数组中存放的文件描述符集中就绪事件的位置是没有什么规律的,在epoll这里还有妙用,epoll这里底层会将就绪的n个事件全部放在我们传进去的events数组的前n位,比如说我们传进去的数组总共10个元素,当底层有4个事件就绪的时候会将四个事件全部放在数组的前四位,对应下标就是0、1、2、3这四个位,返回值为4,所以我们上层想要找就绪时间的时候只需要遍历数组的0 ~ n位,而不是像select和poll那样将整个数组都遍历一边,所以epoll每次处理时间的时候效率都会很高。
用用刚刚封装的接口
创建epoll模型,会获得一个epfd,直接在EpollServer中添加一个成员:
那么就可以调用WaitEpoll等连接到来了,不过调用的时候有一个参数events用来表示就绪的事件,这个就绪的事件在别的地方也是会用到的,所以搞一个成员:
构造函数初始化:
析构的时候也得将空间释放掉:
然后就是调用WaitEpoll来接收连接:
上面搞的是1s超时。我注释掉的那条语句其实有其他用,如果说出现底层就绪事件很多,但是上层给的数组太小了,可以在res == 数组大小的时候给数组扩个容,这里我懒得写了,感兴趣的同学可以自行实现一下。
测试一下:
连一个:
会一直打印get link,因为我这里还没有写接收连接的逻辑,连接没接收,listenSock就一直是就绪的状态。
再演示一下非阻塞的:
运行:
会不断打印time out,非阻塞就是这样,我前面博客也讲过了,就不细说了。
再演示一下阻塞的:
运行:
连接上客户端了之后还是死循环打印,因为没有接收连接:
但是刚开始的时候只有listenSock,建立好连接后,不能直接调用read这样的函数来读取数据,因为会导致进程阻塞,所以得先放到epoll模型中,让epoll去监测新建立起来的连接就绪没有。
所以随着后面连接慢慢增多,会再向epoll中添加新的文件描述符(这里先暂时考虑只读取数据的sock,等后面讲reactor的时候再写的完整一点)。那么当后面有了新的负责通信的文件描述符的时候也就会出现两种不同功能的文件描述符,一种是专门负责接收连接的,一种是专门负责接收对端发来的数据的。
所以要分情况来考虑。和前面select和poll一样,专门搞两个函数,一个负责接收连接,一个负责接收数据。
接收连接:
接收数据:
这里接收数据有个小细节,当对端关闭连接或者读取失败,需要本端关闭连接,关闭连接的时候得先在Epoll中去掉对sock的关心,因为epoll对于sock的三个操作都是在sock有效的前提下才能进行的,如果是先关掉sock会很容易导致epoll去掉关心的时候出现错误,所以得先去掉节点再close(sock)才是正确的。
把前面的Start改一下:
其中的Handler:
连接三个:
发消息:
退出:
成功。
如果你想对客户端发来的数据做处理,可以用一下function包装器在类中搞一个类型:
然后成员中搞一个对象,专门用来处理用户发来的数据:
在构造函数那里初始化一下:
构造对象的时候显示传一个:
这里我就不写什么处理了,就简单打印一下。再说一遍,这里只是为了见见猪跑,看看epoll咋用的,等后面讲reactor了再详细讲解其中的一些细节,比如说TCP的粘包问题,协议定制等等。
运行一下:
成功的。
这里建议的epoll服务器就写到这里,等会写一个更加完善的ET模式的epoll服务器。
上方完整代码如下:
Epoll.hpp
#pragma once
#include <sys/epoll.h>
#include "LogMessage.hpp"
#include <iostream>
class Epoll
{
public:
// 创建epoll模型
static int CreateEpoll()
{
// 参数size随便给一个大于零的数就行
int res = epoll_create(128);
if(res == -1)
{
LogMessage(FATAL, "create epoll fail, errno[%d]::%s", errno, strerror(errno));
exit(5); // 这里是为了和我Sock.hpp连着所以给的5,前面Sock最后一个退出码是4
}
// 返回epoll模型的文件描述符
return res;
}
// 增删改
static bool CtlEpoll(int epfd, int op, int fd, uint32_t events)
{
// 这里用的时候是光传一个events
epoll_event event;
event.events = events; // 只需要一个events就行
event.data.fd = fd; // event data中的fd直接用参数中的fd
int res = epoll_ctl(epfd, op, fd, &event);
if(res == -1)
{
LogMessage(FATAL, "epoll_ctl fail, errno[%d]::%s", errno, strerror(errno));
exit(6);
}
return res == 0;
}
// 等待就绪事件
static int WaitEpoll(int epfd, epoll_event *events, int maxevents, int timeout)
{
int res = epoll_wait(epfd, events, maxevents, timeout);
if(res == -1)
{
LogMessage(FATAL, "epoll_wait fail, errno[%d]::%s", errno, strerror(errno));
exit(7);
}
return res;
}
};
EpollServer.hpp
#pragma once
#include "Sock.hpp"
#include "Epoll.hpp"
#include <cassert>
#include <functional>
// 命名空间封装一下
namespace FangZhang
{
#define MAX_EVENTS 100
using func_t = std::function<void(std::string)>; // 等价于typedef std::function<void()> func_t
class EpollServer
{
public:
EpollServer(func_t dataProcess, uint16_t port = 8080)
: _port(port)
, _dataProcess(dataProcess)
{
// socket创建套接字
_listenSock = Sock::Socket();
// bind绑定
Sock::Bind(_listenSock, _port);
// 设置监听状态
Sock::Listen(_listenSock);
// 将listenSock添加到epoll中
_epfd = Epoll::CreateEpoll(); // 创建epoll模型
Epoll::CtlEpoll(_epfd, EPOLL_CTL_ADD, _listenSock, EPOLLIN); // 将listenSock添加到epoll模型中
_revents = new epoll_event[MAX_EVENTS];
_maxRevents = MAX_EVENTS;
}
void Start()
{
// -1阻塞
int timeout = -1;
while(1)
{
// 等待就绪
int res = Epoll::WaitEpoll(_epfd, _revents, _maxRevents, timeout);
//if(res == _maxRevents);
if(res == 0)
{ // 超时
std::cout << "time out" << std::endl;
}
else
{ // 处理就绪事件
Handler(res);
}
}
}
~EpollServer()
{
if(_listenSock >= 0) close(_listenSock);
if(_revents) delete[] _revents;
if(_epfd >= 0) close(_epfd);
}
private:
// 处理就绪事件
void Handler(int n)
{
// _renvents数组前n位即为就绪的事件
for(int i = 0; i < n; ++i)
{
// 拿出就绪的事件
uint32_t events = _revents[i].events;
int sock = _revents[i].data.fd;
if(events & EPOLLIN)
{// 读事件就绪
if(sock == _listenSock) Accepter(); // listenSock就去接收连接
else Recver(sock); // 普通通信的sock就去读取数据
}
}
}
void Accepter()
{
std::string clientIP;
uint16_t clientPort;
int sock = Sock::Accept(_listenSock, clientIP, clientPort);
if(sock > 0)
{ // 创建套成功,得要放到epoll模型中
if(Epoll::CtlEpoll(_epfd, EPOLL_CTL_ADD, sock, EPOLLIN))
std::cout << sock << " in epoll" << std::endl;
}
}
// 读取数据
void Recver(int sock)
{
char buff[128];
int res = read(sock, buff, sizeof(buff) - 1);
if(res > 0)
{ // 读到数据
buff[res] = 0;
_dataProcess(buff);
}
else if(res == 0)
{ // 对端关闭连接
printf("client[%d] closed, me too\n", sock, buff);
bool res = Epoll::CtlEpoll(_epfd, EPOLL_CTL_DEL, sock, EPOLLIN);
assert(res);
(void)res;
close(sock);
}
else
{ // 读取出错
printf("client[%d] read err, errno[%d]::%s\n", sock, errno, strerror(errno));
bool res = Epoll::CtlEpoll(_epfd, EPOLL_CTL_DEL, sock, EPOLLIN);
assert(res);
(void)res;
close(sock);
}
}
private:
int _listenSock;
uint16_t _port;
int _epfd;
epoll_event *_revents;
int _maxRevents;
func_t _dataProcess;
};
}
EpollServer.cc
#include "EpollServer.hpp"
#include <memory>
void DataProcess(std::string data)
{
printf("client #%s\n", data.c_str());
}
int main()
{
// 智能指针
std::unique_ptr<FangZhang::EpollServer> pes(new FangZhang::EpollServer(DataProcess));
pes->Start();
return 0;
}
小总结
epoll的使用过程就是三部曲:
调用epoll_create创建一个epoll句柄;
调用epoll_ctl, 将要监控的文件描述符进行注册;
调用epoll_wait, 等待文件描述符就绪;
epoll的优点(和 select 的缺点对应)
-
接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
-
数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而select/poll都是每次循环都要进行拷贝)
-
事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中, epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响.
-
没有数量限制: 文件描述符数目无上限
网上有些博客说, epoll中使用了内存映射机制
内存映射机制: 内核直接将就绪队列通过mmap的方式映射到用户态. 避免了拷贝内存这样的额外性能开销.。
这种说法是不准确的. 我们定义的struct epoll_event是我们在用户空间中分配好的内存. 势必还是需要将内核的数据拷贝到这个用户空间的内存中的,用户空间中使我们自己申请出来的,只要是自己申请出来的东西想要访问内核中的数据,就必须从内核空间将数据拷贝到我们申请的空间当中的。不可能说直接将内核的空间让用户直接访问,os是不相信任何用户的。
epoll模式
先不说epoll有啥模式,先来讲个例子。
假如说现在有两个快递员,一个张三,一个李四。假如说二者都是在你宿舍楼下派送快递,不过一天只有一个人。
第一天是张三派发,张三人非常好,如果有你的快递,就会通知你下楼取快递,但是如果你正在和你的舍友开黑,没办法下来,你对张三说等会就下,张三说好的,等了一分钟,张三看你还没下来,又给你打电话,但是你还在打游戏,于是又对张三说马上就下,张三等了一会又给你打了电话,但是你又说等一会,于是张三又等……,只要有你的快递而且你一直不取,就会不断通知你,给你打电话。
第二天是李四派发,李四觉得一直通知太麻烦了,于是只给你说:楼下有你的快递,我只打这一次电话,我马上就要走了,如果你不取你的快递我就带走了。这种情况下你得尽快把手里的事停下来,赶紧下楼去取,不然这次快递没取到又得等下一次。但是如果李四给你打电话之后又来了一个你的快递,李四就会再通知你说:楼下有你一个新的快递,请尽快下来取。
ok,把上面的张三和李四看成两种底层通知的模式,把快递看成数据,把你看作上层,那么:
- 张三:手里只要有你的数据就会一直通知你,这就是LT(level triggered,水平触发)
- 李四:手里有数据首次到达、从无到有、从有到无(变化)的时候才会通知你一次,这就是ET(edge triggered,边缘触发)。
显然我前面写的select、poll、epoll服务器默认情况下都是LT模式的,响应的测试就是有连接到来的时候不去接收连接就会疯狂打印连接到来。因为底层在不断通知你去获取连接。
原则上张三和李四哪一个更高效呢?
李四,因为一个快递员一天打电话的总数是相对比较确定的,李四这种模式能保证一天打出去的所有电话是不重复的(有新快递不算重复),所以如果用户个数无限,那李四可以通知的范围更广,就可以让更多的事件就绪,让用户尽快取走,能处理的IO更多。
小细节
细节一:
我为什么要听李四的(上层为什么要听epoll的ET模式)?
因为如果不取数据,底层就再也不通知了,上层调用就无法再次获取该fd的就绪事件了,也就是没法调用recv了,变相的数据就丢了。这样ET模式就会倒逼程序员,如果数据就绪了就必须把本轮的数据全部取走,否则数据可能丢失。
细节二:
我可以暂时不处理张三的通知事件吗?
可以,因为如果我不取或者我取了一部分,是不用担心的,因为底层还是会告诉上层fd就绪了的,我还有读取的机会。
细节三:
如果是LT模式,我也可以将数据全部读取完毕,这样LT和ET在效率上其实是没有差别的。
但为什么说ET模式更高效呢?
- 更少的返回次数。
- ET模式会倒闭程序员尽快将接收缓冲区中中的数据全部取走,应用层尽快的取完了缓冲区中的数据,那么在单位时间下,该模式下工作的服务器就可以在一定程度上给发送方一个更大的接收缓冲区(相比于LT),所以对方就可以有更大的滑动窗口,这样发送方就可以一次向服务器发送更多的数据,从而提高了IO的吞吐量。
细节四:
我们怎么保证我们把本轮数据全部读取完毕了呢?
讲个例子。
假如说你面前有一个不透明的盒子,里面有很多石子(石子大小不定),你每次可以伸手下去摸石子,你如何确定这一次把数据取完了呢?假如说剩下一个石子了,但是你是不知道的,你把这个石子取走了,盒子就空了,但是你并不知道盒子已经空了,但是你伸手下去摸的时候就知道盒子空没空了。盒子空了,就是取了。
所以说我们读取数据的时候必须一直循环读取,在最后一次正常读取完毕的时候,我们势必还要进行下一次读取(无法确定是否读取完成),比如说5KB的数据,一次读1KB,分五次读取完毕后你还要读取第六次,因为不确定第五次读取完了没有。这样第六次读取的时候必然会阻塞,所以为了避免这个问题,在ET模式下工作,我们的sock必须要被设置成非阻塞的,只要读取出错(EAGIN,我前一篇博客讲了,不懂的同学先看一下)就证明读取完毕了。
所以在ET模式下的sock必须是非阻塞的。
而且为了能正常工作,常规用来通信的sock必须要有自己独立的接收缓冲区,将不完整的数据先攒着,用来解决粘包等问题。这在我前面讲协议的那篇博客中也是说过的。同时也要有一个发送的缓冲区来解决写入可能出现的问题。
下面就来写一个ET模式的epoll服务器。
编写ET模式服务器(Reactor服务器)
完整过程
还是前面的Sock和LogMessage,代码我就不给了。
老样子:
下面该创建epoll模型了,我再重新将Epoll相关的接口封装一下:
这里直接在EpollServer中定义一个Epoll的成员:
然后下面就要将listenSock添加到epoll模型中了,这里专门搞了一个添加文件描述符的接口,后面普通通信的sock也能复用。
但为了方便通信,再添加一个类,用来负责服务器和客户端间的连接,刚刚说的,一个接收缓冲区,一个发送缓冲区,还有对应通信的sock,每个sock通信的时候要处理的事件有读、写、异常(像select那样分类),所以专门搞三个函数对象来让对应sock处理这三种事件,只要创建一个通信的套接字就开辟一个对象,用来专门通信:
然后就可以将listenSock添加到epoll模型中了:
不过我创建一个connection对象,得要将其保存起来,不然等会调用回调函数的时候就找不到对应的connection对象了,所以在成员变量中搞一个哈希(sock : connection*),找起来效率更高:
此时再将sock添加到epoll模型中:
还有一点非常重要,刚刚说了ET模式下的sock必须要搞成非阻塞模式,所以函数开头可以将其设置为非阻塞,在Sock.hpp中添加一个函数:
这个函数我在前一篇博客也介绍了,这里不细说了。
添加sock函数:
对于读事件的关心,默认情况下都是打开的,而对于写事件的关心,都是需要的时候才打开。写事件
就绪对应的就是发送缓冲区没满,那么服务器刚开始发送缓冲区肯定是空的,那么就意味着发送缓冲区一定是就绪的,如果你代码写的不好可能会导致给客户端发送一些垃圾数据,所以些时间按需设置,等会也会有代码演示。
添加listenSock前还要决定一下listenSock的读回调函数是啥,所以需要再写一个Accepter:
这里就先测试一下listenSock是否添加成功了,先不写什么将通信的sock添加到Epoll模型中。
此时就可以在构造函数中添加listenSock了:
后面两个函数对象给成空,先暂时不考虑。
但是有一个问题,这里添加会失败,因为Accepter是当前类内的函数,参数中有一个this指针,和func_t中函数的类型不匹配,怎么搞呢?
可以用bind,不是套接字中的bind,而是C++中的函数绑定器,我前面将C++11的那篇博客中讲过,bind可以让函数用的时候少传参数(不懂的同学看这篇:【C++】C++11中比较重要的内容介绍):
此时可以等待listenSock就绪了,想要获取就绪sock就要有一个专门存放epoll_event的数组来获取就绪的文件描述符,和前面写的那个epoll一样:
开空间:
记得析构释放空间:
Epoll中封装的epoll_wait:
服务器等待事件就绪:
其中的isExist:
关于哈希的操作我就不讲了,前面博客中讲过的。
looponce这个函数其实是对刚刚写的简易Epoll服务器中的Start内部的一些逻辑做了封装,looponce就是循环一次的意思:
现在就可以测试一下能不能接收到连接了:
成功。
有没有发现这里和前面不太一样的地方,没有疯狂打印get new link,因为这里是ET模式。底层不会一直通知上层,只会通知一次。所以这里只会打印一次。
ok,现在已经能接收到连接的通知了,下面就正式接收连接:
对于listenSock只用关心读,也就是接收连接,但对于用来IO的sock读、写、异常都要关心,所以需要我们在类内提供三个专门负责通信时进行处理读、写、异常的函数:
先加上打印方便等会测试。
那么Connection对象添加这三个函数的时候又得要用到绑定器了:
测试一下:
成功。
但是,这里的Accepter是有问题的,所有的文件描述符在起初都被设置成了非阻塞的,Accepter如何保证底层只有一个连接就绪呢?Accepter这里的写法只是接收了一个连接,就像5KB只拿了1KB一样。所以说对于ET模式下的读事件必须保证把底层的数据全部读完,所以得这样来搞:
没接收到连接也要特殊处理,前一篇博客说过,如果调用这样的接口失败还有可能是因为读完了/写没有空间了或者是因为信号被中断了当前的读写,listenSock是非阻塞的,所以最后一次读取一定不会被阻塞,errno会被设置成EWOULDBLOCK,所以修改如下:
这样就完美了,这里我将EWOULDBLOCK的地方打印一下方便测试:
测试:
正常。
然后再来修改一下Reader,Reader也是需要循环进行读取,最后一次判断EWOULDBLOCK:
异常处理的函数:
就是进行一些清理工作,等会Writer也是要用到的。
OK,接着说回Recver,服务端接收到数据后,不进行数据处理,而是交给上层处理,现在就来写写上层的处理,需要定制协议(协议我前面博客中讲过了,这里不细说了),这里就直接用特殊字符作为分隔符来实现协议,比如说我现在要实现一个网络版本的计算器,那么客户端和在发送和处理数据的时候都要按照这个协议来处理。比如说我用X来作为分隔符,两边要加上空格,那么一个完整报文的格式是a空格+空格bX,比如说1 + 2X就是一个完整的报文。
那么客户端发送的数据就有可能是这样的:1 + 2X3 * 4X6 - 3X,这里面就有三个完整的报文,所以服务端接收到这个报文后就需要对其中的报文分开,这就是处理粘包问题,新建一个Protocol.hpp,下面就来实现一下这个函数:
Reader里面加个打印方便观察:
测试:
很成功。
不过其中后面会多出来两个字符\r\n,这里可以清除掉这两个字符,从而使得前后两次不连续的数据能正确放在接收缓冲区中:
这里修改一下接收到数据后打印的东西:
测试:
ok,这样就解决了粘包问题,下面就可以将完整的报文交给上层进行处理了,我这里选择再在类内搞一个新的:
定义一个类内的函数对象:
可以在构造函数处初始化,也可以在这里:
然后来写上层的这个NetCal,分如下几步:
关于序列化和反序列化的东西我就不再重新写一遍了,重点不在此,前面博客也是讲过的,我就直接用前面博客中的代码了:
// 方便等会发送,方便客户端进行解码
void Encode(std::string &s)
{
s += SEP;
}
// 请求中的字段
class Request
{
public:
// 这里是原来客户端用的,但是这里没有写客户端就不用这个了。
std::string Serialize()
{
std::string str;
str = std::to_string(x_);
str += SPACE;
str += op_; // TODO
str += SPACE;
str += std::to_string(y_);
return str;
}
// 这个是给服务端用的,用来将完整报文中的字段分出来
bool Deserialized(const std::string &str) // 1 + 1
{
std::size_t left = str.find(SPACE);
if (left == std::string::npos)
return false;
std::size_t right = str.rfind(SPACE);
if (right == std::string::npos)
return false;
x_ = atoi(str.substr(0, left).c_str());
y_ = atoi(str.substr(right + SPACE_LEN).c_str());
if (left + SPACE_LEN > str.size())
return false;
else
op_ = str[left + SPACE_LEN];
return true;
}
public:
Request()
{
}
Request(int x, int y, char op) : x_(x), y_(y), op_(op)
{
}
~Request() {}
public:
int x_; // 是什么?
int y_; // 是什么?
char op_; // '+' '-' '*' '/' '%'
};
// 响应
class Response
{
public:
// 这给是给服务端用的,用来将响应中的结果序列化
std::string Serialize()
{
std::string s;
s = std::to_string(code_);
s += SPACE;
s += std::to_string(result_);
return s;
}
// 这是给客户端用的,但是也是没写,所以派不上用场
bool Deserialized(const std::string &s)
{
std::size_t pos = s.find(SPACE);
if (pos == std::string::npos)
return false;
code_ = atoi(s.substr(0, pos).c_str());
result_ = atoi(s.substr(pos + SPACE_LEN).c_str());
return true;
}
public:
Response()
{
}
Response(int result, int code) : result_(result), code_(code)
{
}
~Response() {}
public:
int result_; // 计算结果
int code_; // 计算结果的状态码
};
上面的代码在Protocol中,下面的计算器在EpollServerET.cc:
static Response calculator(const Request &req)
{
Response resp(0, 0);
switch (req.op_)
{
case '+':
resp.result_ = req.x_ + req.y_;
break;
case '-':
resp.result_ = req.x_ - req.y_;
break;
case '*':
resp.result_ = req.x_ * req.y_;
break;
case '/':
if (0 == req.y_)
resp.code_ = 1;
else
resp.result_ = req.x_ / req.y_;
break;
case '%':
if (0 == req.y_)
resp.code_ = 2;
else
resp.result_ = req.x_ % req.y_;
break;
default:
resp.code_ = 3;
break;
}
return resp;
}
这里就是一个计算器,通过一个request,经过计算,得到一个Response,用一下:
放到发送缓冲区后就可以让服务端发送数据了,那么先来写写writer来写一下,首先还是要循环进行发送,等errno为EAGAIN或用户定义的发送缓冲区满了的时候再停止:
那么用户和内核缓冲区的问题怎么处理呢?
当用户发送缓冲区空了,就不发了,直接关掉对于当前sock写的关心;
当内核发送缓冲区满了,那就等等,让内核缓冲区中的数据发走,再调用Writer进行发送。
如何做到让去掉对sock写的关心呢?
得再在Epoll中搞一个接口:
服务器中给一个:
然后在Writer中调用这个接口就可以修改读和写的关心了:
但是有个问题:
此时虽然服务端能处理数据,但是数据发送不出去,因为写事件一直是没有就绪的状态,所以Writer接口调不动,怎么改改呢?
我刚刚在NetCal中还有一步,就是想办法让服务端发送数据,这时候Connection中的回指指针就派上用场了:
通过回指指针调用到服务端中的EnableReadWrite接口,非常巧妙。
测试:
其中的0 88X0 2X0 72X代表的就是每次计算的结果,65 + 23 = 88,26 / 13 = 2,95 - 23 = 72。
那么这里就完成了一个简易的ET服务器。其实上层在处理业务的时候完全可以接入一个线程池来处理,当用户比较多的时候,就可以搞一个线程池来专门处理用户的业务,线程池我就不细讲了,前面的博客中有,想了解一下的同学看这篇:【Linux】线程详解完结篇——信号量 + 线程池 + 单例模式 + 读写锁
还有其他可以优化的地方,比如说设置一个计时的东西,当某些客户端长时间没有发送消息的时候就可以将连接关闭,这样就会减少一下服务器的压力,简单说一下怎么实现,在Connection中再添加一个int类型的time字段,初始化的时候就调用time()函数来形成一个时间戳:
每个连接每次接收到新的数据的时候就更新一下这个time字段:
然后再在Start的while中每次都遍历一下所有的连接,获取遍历时的时间戳,然后让这两个时间戳相减,当结果大于某个数的时候就可以认为客户端已经断开了,那就直接调用异常处理,不再维护这个链接,这样服务器的压力能减少一点:
这个函数我就不提供了,反正大概逻辑就是这样,很简单,就不搞了。
后面这个Reactor模式的Epoll服务器目的在于读、写、异常事件的处理上,像什么序列化反序列化的都是边角料的东西。
该讲的都讲了。
完整代码(做过修改的和新的)
Sock.hpp
#pragma once
#include "LogMessage.hpp"
#include <iostream>
#include <string>
#include <memory>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
// 对套接字相关的接口进行封装
class Sock
{
private:
static const int gBackLog = 20;
public:
// 1. 创建套接字
static int Socket()
{
/*先AF_INET确定网络通信*/ /*这里用的是TCP,所以用SOCK_STREAM*/
int listenSock = socket(AF_INET, SOCK_STREAM, 0);
// 创建失败返回-1
if(listenSock == -1)
{
LogMessage(FATAL, _F, _L, "server create socket fail");
exit(2);
}
LogMessage(DEBUG, _F, _L, "server create socket success, listen sock::%d", listenSock);
// 创建成功
return listenSock;
}
// 2. bind 绑定IP和port
static void Bind(int listenSock, uint16_t port, const std::string& ip = "0.0.0.0")
{
sockaddr_in local; // 各个字段填充
memset(&local, 0, sizeof(local));
// 若为空字符串就绑定当前主机所有IP
local.sin_addr.s_addr = inet_addr(ip.c_str());
local.sin_port = htons(port);
local.sin_family = AF_INET;
/*填充好了绑定*/
if(bind(listenSock, reinterpret_cast<sockaddr*>(&local), sizeof(local)) < 0)
{
LogMessage(FATAL, _F, _L, "server bind IP+port fail :: %d:%s", errno, strerror(errno));
exit(3);
}
LogMessage(DEBUG, _F, _L, "server bind IP+port success");
}
// 3. listen为套接字设置监听状态
static void Listen(int listenSock)
{
if(listen(listenSock, gBackLog/*后面再详谈listen第二个参数*/) < 0)
{
LogMessage(FATAL, _F, _L, "srever listen fail");
exit(4);
}
LogMessage(NORMAL, _F, _L, "server init success");
}
// 4.accept接收连接 输出型参数,返回客户端的IP + port
static int Accept(int listenSock, std::string &clientIp, uint16_t &clientPort)
{
/*客户端相关字段*/
sockaddr_in clientMessage;
socklen_t clientLen = sizeof(clientMessage);
memset(&clientMessage, 0, clientLen);
// 接收连接
int serverSock = accept(listenSock, reinterpret_cast<sockaddr*>(&clientMessage), &clientLen);
// 对端的IP和port信息
clientIp = inet_ntoa(clientMessage.sin_addr);
clientPort = ntohs(clientMessage.sin_port);
if(serverSock < 0)
{
// 这里没连接上不能说直接退出,就像张三没有揽到某个客人餐馆就不干了,所以日志等级为ERROR
return -1;
}
else
{
LogMessage(NORMAL, _F, _L, "server accept connection success ::[%s:%d] server sock::%d", \
clientIp.c_str(), clientPort,serverSock);
}
return serverSock;
}
static void NonBlock(int sock)
{
int oldFl = fcntl(sock, F_GETFL);
if(oldFl == -1)
{
LogMessage(FATAL, "fcntl GETFL err, errno[%d]::%s", errno, strerror(errno));
exit(6);
}
fcntl(sock, F_SETFL, oldFl | O_NONBLOCK);
}
};
Epoll.hpp
#pragma once
#include <sys/epoll.h>
#include <iostream>
class Epoll
{
const static int gsize = 128;
public:
int CreateEpoll()
{ // 创建epoll模型
_epfd = epoll_create(gsize);
if(_epfd < 0) exit(5);
return _epfd;
}
void AddSockToEpoll(int sock, uint32_t events)
{ // 将sock添加到epoll中
epoll_event eevent;
eevent.data.fd = sock;
eevent.events = events;
epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &eevent);
}
void DelSockFromEpoll(int sock)
{ // 删除Epoll中对sock的关心
epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
}
void ModEpoll(int sock, uint32_t events)
{ // 修改Epoll中关心的事件
events |= EPOLLET;
epoll_event eevent;
eevent.data.fd = sock;
eevent.events = events;
epoll_ctl(_epfd, EPOLL_CTL_MOD, sock, &eevent);
}
int WaitEpoll(epoll_event *revents, int revnum, int timeout)
{ // 等sock就绪
return epoll_wait(_epfd, revents, revnum, timeout);
}
public:
int _epfd;
};
Protocol.hpp
#pragma once
#include <iostream>
#include <vector>
#define SEP "X"
#define SEP_LEN strlen(SEP)
#define SPACE " "
#define SPACE_LEN strlen(SPACE)
// 处理粘包问题,将完整的报文分开
/*vs就是处理完整个包后的所有的完整小包,输出*/
void SplitMessage(std::string &message, std::vector<std::string> &vs)
{ /*message就是黏在一块的包,也就是接收缓冲区中的数据,输出*/
while(1)
{
// 先找到其中的SEP
size_t pos = message.find(SEP);
if(pos == std::string::npos)
{ // 没有SEP,也就是没有完整报文了
break;
}
// 取出一个完整的包
std::string tmp = message.substr(0, pos);
// 删掉接收缓冲区中刚刚的包
message.erase(0, pos + SEP_LEN);
// 将完整的包添加到vs中
vs.push_back(tmp);
}
}
void Encode(std::string &s)
{
s += SEP;
}
// 请求中的字段
class Request
{
public:
// 这里是原来客户端用的,但是这里没有写客户端就不用这个了。
std::string Serialize()
{
std::string str;
str = std::to_string(x_);
str += SPACE;
str += op_; // TODO
str += SPACE;
str += std::to_string(y_);
return str;
}
// 这个是给服务端用的,用来将完整报文中的字段分出来
bool Deserialized(const std::string &str) // 1 + 1
{
std::size_t left = str.find(SPACE);
if (left == std::string::npos)
return false;
std::size_t right = str.rfind(SPACE);
if (right == std::string::npos)
return false;
x_ = atoi(str.substr(0, left).c_str());
y_ = atoi(str.substr(right + SPACE_LEN).c_str());
if (left + SPACE_LEN > str.size())
return false;
else
op_ = str[left + SPACE_LEN];
return true;
}
public:
Request()
{
}
Request(int x, int y, char op) : x_(x), y_(y), op_(op)
{
}
~Request() {}
public:
int x_; // 是什么?
int y_; // 是什么?
char op_; // '+' '-' '*' '/' '%'
};
// 响应
class Response
{
public:
// 这给是给服务端用的,用来将响应中的结果序列化
std::string Serialize()
{
std::string s;
s = std::to_string(code_);
s += SPACE;
s += std::to_string(result_);
return s;
}
// 这是给客户端用的,但是也是没写,所以派不上用场
bool Deserialized(const std::string &s)
{
std::size_t pos = s.find(SPACE);
if (pos == std::string::npos)
return false;
code_ = atoi(s.substr(0, pos).c_str());
result_ = atoi(s.substr(pos + SPACE_LEN).c_str());
return true;
}
public:
Response()
{
}
Response(int result, int code) : result_(result), code_(code)
{
}
~Response() {}
public:
int result_; // 计算结果
int code_; // 计算结果的状态码
};
EpollServer.hpp
#pragma once
#include "Sock.hpp"
#include "Epoll.hpp"
#include <string>
#include <functional>
#include <unordered_map>
void SplitMessage(std::string &message, std::vector<std::string> &vs);
namespace FangZhang
{
class Connection;
class EpollServerET;
using func_t = std::function<void(Connection*)>;
class Connection
{
public:
Connection(int sock = -1, EpollServerET * prsvr = nullptr) // 初始化给-1,表示不存在
: _sock(sock)
, _prsvr(prsvr) // 这个回指指针等会有用,这里暂时体现不出来用处
{}
void SetCallBacks(func_t readCallBack, func_t sendCallBack, func_t exceptCallBack)
{ // 给每个回调赋值
_readCallBack = readCallBack;
_sendCallBack = sendCallBack;
_exceptCallBack = exceptCallBack;
}
public:
int _sock; // 专门通信的sock
std::string _readBuff; // 接收缓冲区 |这里的缓冲区有点小BUG,就是不能
std::string _sendBuff; // 发送缓冲区 |处理二进制流,但是文本还是能处理的
func_t _readCallBack; // 读取回调
func_t _sendCallBack; // 发送回调
func_t _exceptCallBack; // 异常回调
EpollServerET *_prsvr; // 回指指针,等会有大用
//time_t _lastTime = time(nullptr); // 保活机制
};
class EpollServerET
{
// 回调
using callBack_t = std::function<void(Connection*, std::string&)>;
const static int REVENT_NUM = 100;
public:
EpollServerET(int port = 8080)
: _port(port)
, _maxReventNum(REVENT_NUM)
{
// 创建套接字
_listenSock = Sock::Socket();
Sock::Bind(_listenSock, _port); // 绑定
Sock::Listen(_listenSock); // 设置监听状态
int epfd = _poll.CreateEpoll(); // 创建epoll模型
// 将listenSock添加到epoll模型中
AddSock(_listenSock, std::bind(&EpollServerET::Accepter, this, std::placeholders::_1), nullptr, nullptr);
// 给就绪数组开空间
_revents = new epoll_event[_maxReventNum];
}
// 启动服务器
void Start(callBack_t cb)
{
_cb = cb;
while(1)
{
// IsAlive(); 保活机制
LoopOnce();
}
}
~EpollServerET()
{
if(_listenSock >= 0) close(_listenSock);
if(_revents) delete[] _revents;
}
void EnableReadWrite(Connection* conn, bool isRead, bool isWrite)
{
uint32_t events = (isRead ? EPOLLIN : 0) | (isWrite ? EPOLLOUT : 0);
_poll.ModEpoll(conn->_sock, events);
}
private:
void AddSock(int sock, func_t readCallBack, func_t sendCallBack, func_t exceptCallBack)
{ // 添加文件描述符的同时也将对应的回调函数处理好
// 将sock设置为非阻塞的
Sock::NonBlock(sock);
// 创建sock对应的Connection对象,并将字段填充好
Connection* conn = new Connection(sock, this);
conn->SetCallBacks(readCallBack, sendCallBack, exceptCallBack);
/*设置Epoll为ET模式*/
_poll.AddSockToEpoll(sock, EPOLLIN | EPOLLET); // 任何多路转接服务器,默认打开对读事件的关心
_sockCon.insert(std::make_pair(sock, conn)); // 哈希映射中记录一下
}
void LoopOnce()
{
// 这里直接以非阻塞方式等待
int timeout = -1;
int n = _poll.WaitEpoll(_revents, _maxReventNum, timeout);
if(n == -1)
{ // 调用出错
LogMessage(WARING, "epoll_wait fail, errno[%d]:%s", errno, strerror(errno));
}
else if(n == 0)
{ // 等待超时
std::cout << "time out" << std::endl;
}
else
{ // 等待成功
for(int i = 0; i < n; ++i)
{ // 遍历就绪的事件
int sock = _revents[i].data.fd;
uint32_t events = _revents[i].events;
if(events & EPOLLIN)
{ // 读事件就绪
if(isExist(sock) && _sockCon[sock]->_readCallBack != nullptr)
{ // 判断映射中是否有这个sock,有了再判断对应的Connection中的读回调是否就绪
_sockCon[sock]->_readCallBack(_sockCon[sock]);
}
}
if(events & EPOLLOUT)
{ // 写事件就绪
if(isExist(sock) && _sockCon[sock]->_sendCallBack != nullptr)
{ // 判断映射中是否有这个sock,有了再判断对应的Connection中的写回调是否就绪
_sockCon[sock]->_sendCallBack(_sockCon[sock]);
}
}
}
}
}
void Accepter(Connection* conn)
{
while(1)
{
std::string clientIp;
uint16_t clientPort;
int sock = Sock::Accept(_listenSock, clientIp, clientPort);
if(sock >= 0)
{ // 接收到连接
// 获取到sock之后就添加到epoll模型、创建一个Connection对象、存入哈希映射中
AddSock(sock,
std::bind(&EpollServerET::Reader, this, std::placeholders::_1),
std::bind(&EpollServerET::Writer, this, std::placeholders::_1),
std::bind(&EpollServerET::Excepter, this, std::placeholders::_1)
); // 直接调用AddSock就能实现
// 对于listenSock只用关心读,也就是接收连接,但对于用来IO的sock读、写、异常都要关心
// 所以需要我们在类内提供三个专门负责通信时进行处理读、写、异常的函数
}
else
{ // 没接收到连接
// 读取完毕,没有连接了,就不用再接收了。accept就会将错误码设置为EAGAIN
if(errno == EWOULDBLOCK || errno == EAGAIN) break;
// 遭到信号中断当前读取,所以是本次读取未完成,还要继续读取
else if(errno == EINTR) continue;
else
{ // 这里才是真正的异常
LogMessage(ERROR, _F, _L, "server accept connection fail");
break;
}
}
}
}
void Reader(Connection* conn)
{ // 专门负责读的函数
//conn->_lastTime = time(nullptr); // 更新一下最后访问的时间
int sock = conn->_sock;
bool err = false;
while(1)
{
char buff[1024] = {0};
// 这里如果用的是recv(sock, buff, sizeof(buff) - 1, 0),最后一个flag设置成0也是没关系的
// 因为前面已经将sock设置成非阻塞的了
int n = read(sock, buff, sizeof(buff) - 1);
if(n > 0)
{ // 读取到数据
// 服务端只负责读取数据,不要处理数据,处理数据是上层业务负责的
buff[n - 2] = 0;
conn->_readBuff += buff;
// 服务端将数据放到接受缓冲区中,然后上层再根据接收缓冲区中的数据进行处理
printf("%s : %d\n", conn->_readBuff.c_str(), conn->_readBuff.size());
}
else if(n == 0)
{ // 对端关闭连接
LogMessage(NORMAL, _F, _L, "client closed, sock[%d]", sock);
conn->_exceptCallBack(conn); // 交给异常来处理
err = true;
break;
}
else
{ // 读取失败
if(errno == EWOULDBLOCK || errno == EAGAIN)
{
break;
}
else if(errno == EINTR) continue;
else
{
conn->_exceptCallBack(conn); // 交给异常来处理
LogMessage(ERROR, _F, _L, "read err, errno[%d]::%s", errno, strerror(errno));
break;
}
}
}
if(!err)
{ // 这里对端没有关闭连接,处理还在的连接中的数据
std::vector<std::string> vs;
SplitMessage(conn->_readBuff, vs); // 取完整报文
for(auto &msg : vs)
{ // 走到这里一定是有完整报文的,上层自行处理
_cb(conn, msg);
}
}
}
void Writer(Connection* conn)
{ // 专门负责写的函数
while(1)
{
int res = write(conn->_sock, conn->_sendBuff.c_str(), conn->_sendBuff.size());
if(res > 0)
{
conn->_sendBuff.erase(0, res); // 去掉用户发送缓冲区已经发送的数据
if(conn->_sendBuff.size() == 0) break; // 如果用户发送缓冲区已经空了就不要再发送了
}
else
{ // res <= 0
if(errno == EWOULDBLOCK || errno == EAGAIN) break;//说明发送缓冲区满了,那就下一次再发送
else if(errno == EINTR) continue; // 被信号打断了,回来之后继续发送
else
{ // 真的出错了
LogMessage(ERROR, _F, _L, "write err, errno[%d]:%s", errno, strerror(errno));
conn->_exceptCallBack(conn);
return;
}
}
}
// 此处可能是用户发送缓冲区空了,也可能是内核缓冲区满了,也可能是真的出错了
if(conn->_sendBuff.empty())
{ // 用户发送缓冲区空了,直接去掉对当前sock写的关心
EnableReadWrite(conn, true, false);
}
else
{
EnableReadWrite(conn, true, true);
}
}
void Excepter(Connection* conn)
{ // 专门负责处理异常的函数
// 先判断sock在不在哈希中,不在就直接返回
if(!isExist(conn->_sock)) return;
// 删除Epoll中的对该sock的关心
_poll.DelSockFromEpoll(conn->_sock);
// 去掉映射
_sockCon.erase(conn->_sock);
// 关闭文件描述符
close(conn->_sock);
// 释放Connection对象的空间,不然内存泄漏了
delete conn;
}
bool isExist(int sock)
{
auto it = _sockCon.find(sock);
if(it == _sockCon.end())
{
return false;
}
return true;
}
private:
int _listenSock;
uint16_t _port;
Epoll _poll; // 用来进行epoll的相关操作
std::unordered_map<int, Connection*> _sockCon; // 存放sock与对应Connection对象地址的映射
epoll_event *_revents; // 就绪的文件描述符事件数组
int _maxReventNum; // 数组元素个数
callBack_t _cb; // 上层处理完整报文的方法
};
}
EpollServer.cc
#include "EpollServerET.hpp"
#include "Protocol.hpp"
#include <memory>
static Response calculator(const Request &req)
{
Response resp(0, 0);
switch (req.op_)
{
case '+':
resp.result_ = req.x_ + req.y_;
break;
case '-':
resp.result_ = req.x_ - req.y_;
break;
case '*':
resp.result_ = req.x_ * req.y_;
break;
case '/':
if (0 == req.y_)
resp.code_ = 1;
else
resp.result_ = req.x_ / req.y_;
break;
case '%':
if (0 == req.y_)
resp.code_ = 2;
else
resp.result_ = req.x_ % req.y_;
break;
default:
resp.code_ = 3;
break;
}
return resp;
}
void NetCal(FangZhang::Connection* conn, std::string& msg)
{
// 反序列化,得到计算的元素
Request req;
req.Deserialized(msg);
// 计算得到结果
Response resp = calculator(req);
// 将结果序列化,从而方便返回给客户端
std::string res = resp.Serialize();
// res即结果,进行加码
Encode(res);
// 加码后放到发送缓冲区中待发送
conn->_sendBuff += res;
// 想办法让服务端将计算的结果返回给客户端
conn->_prsvr->EnableReadWrite(conn, true, true);
}
int main()
{
std::unique_ptr<FangZhang::EpollServerET> svr(new FangZhang::EpollServerET());
svr->Start(NetCal);
return 0;
}
到此结束。。。