本章代码Gitee地址:EpollServer
文章目录
- 1. epoll接口
- 1.1 epoll_create
- 1.2 epoll_wait
- 1.3 epoll_ctl
- 2. epoll原理
- 3. epoll_server
- 4. epoll两种工作模式
1. epoll接口
1.1 epoll_create
#include <sys/epoll.h>
int epoll_create(int size);
参数int size
理论上可以随便写(已废弃)
返回值:
- 成功返回一个文件描述符
- 失败返回
-1
1.2 epoll_wait
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
参数:
-
int epfd
:epoll_create
的返回值 -
struct epoll_event *events, int maxevents
:用户及缓冲区,返回已经就绪的文件描述符和事件typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; //位图传递 epoll_data_t data; // };
-
int timeout
:超时时间,单位是毫秒,0
为非阻塞,-1
为阻塞式
返回值:已经就绪的文件描述符的个数
1.3 epoll_ctl
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数:
-
int epfd
:epoll_create
的返回值 -
int op
:EPOLL_CTL_ADD //增添 EPOLL_CTL_MOD //修改 EPOLL_CTL_DEL //删除
-
int fd, struct epoll_event *event
:哪个文件描述符上的哪个事件
select
和poll
都是用数组维护的,需要用户进行管理
2. epoll原理
网卡是外设,当硬件就绪之后,会以硬件中断的方式来告诉操作系统,将网卡的数据读到网卡驱动上,而操作系统读数据是从文件缓冲区读取数据。
所以为了支持epoll
,操作系统支持三种机制:
-
内核会维护一颗红黑树,红黑树节点里面包含:
struct rb_node { int fd; //内核要关系的文件描述符 uint32_t event; //要关系的事件 位图形式 //... }
-
此外还会维护一个就绪队列,一旦红黑树上有节点就绪,此时就会将该节点链入到队列当中
struct list_node
{
int fd; //已就绪的文件描述符
uint32_t event; //已就绪的事件
//...
}
-
操作系统的底层网卡,是允许操作系统注册一些回调机制。
操作系统内部提供一个回调函数,网卡以中断的方式将数据搬到了网卡驱动层,驱动层当中有数据就绪了,那么数据链路层就会自动调用对应的回调函数。
这个回调函数要做的就是:- 向上交付
- 数据到来解包交到
tcp
接收队列 - 查找
rb_tree->fd
- 构建就绪节点,插入就绪队列
以上三套机制,就叫做epoll
模型
Linux一切接文件,strcut file
指针指向这个epoll
模型,然后将struct file
对象添加到进程文件描述符表里面,所以epoll
的返回值是一个文件描述符。
epoll
优势:
-
检测就绪时间复杂度为O(1),判断队列是否为空
获取就绪队列时间复杂度O(n)
-
fd
、event
没有上限,所以的文件描述符和关系的事件都是由红黑树管理的,这颗红黑树多大,操作系统决定
如何看待这颗红黑树?
select
和poll
都需要辅助数组,数组用户维护,而这颗红黑树就相当于之前我们自己维护的数组,只不过在epoll
里面是由系统管理
epoll_wait
返回值表示有多少事件就绪,将就绪的节点一个一个弹出,依次放入数组,就绪事件是连续的
3. epoll_server
#include<iostream>
#include<memory>
#include<string>
#include"Socket.hpp"
#include"Log.hpp"
#include"Epoller.hpp"
#include"Nocopy.hpp"
uint32_t EVENT_IN = (EPOLLIN);
uint32_t EVENT_OUT = (EPOLLOUT);
class EpollServer : public Nocopy
{
static const int defaultnum = 64; //默认一次性最多获取64个事件
public:
EpollServer(uint16_t port)
:_port(port),
_listensock_ptr(new MySocket()),
_epoller_ptr(new Epoller())
{}
void Init()
{
//创建套接字
_listensock_ptr->Socket();
//绑定套接字
_listensock_ptr->Bind(_port);
//监听套接字
_listensock_ptr->Listen();
log(Info, "create listen socket success: %d", _listensock_ptr->Getfd());
}
void Accepter()
{
// 获取新链接
std::string clientip;
uint16_t clientport;
int sock = _listensock_ptr->Accept(&clientip, &clientport);
if (sock > 0)
{
// 不能直接读取,获取连接不代表发送了数据
// 让epoll去关心
_epoller_ptr->EpollerCtl(EPOLL_CTL_ADD, sock, EVENT_IN);
log(Info, "get a new link, clientip: %s, clientport: %d", clientip.c_str(), clientport);
}
}
void Recver(int fd)
{
// 读事件就绪
char buffer[1024];
ssize_t n = read(fd, buffer, sizeof(buffer) - 1); //BUG
if (n > 0)
{
buffer[n] = 0;
std::cout << "get a message: " << buffer << std::endl;
//返回
std::string echo_str = "server echo $";
echo_str += buffer;
write(fd, echo_str.c_str(), echo_str.size());
}
else if (n == 0)
{
log(Info, "client quit, me too, close fd:%d", fd);
//从epoll当中移除 删除红黑树节点
_epoller_ptr->EpollerCtl(EPOLL_CTL_DEL, fd, 0);
close(fd); //细节 先移除再关闭
}
else
{
log(Warning, " read error, close fd:%d", fd);
}
}
void Dispatcher(struct epoll_event revs[], int num)
{
for(int i = 0; i < num; i++)
{
uint32_t events = revs[i].events;
int fd = revs[i].data.fd;
if(events & EVENT_IN)
{
//读事件就绪
if(fd == _listensock_ptr->Getfd())
{
Accepter();
}
else
{
//其他事件就绪
Recver(fd);
}
}
else if(events & EVENT_OUT)
{
//写事件就绪
}
}
}
void Start()
{
//listensock套接字添加进epoll当中
//listensock和它关心的事件 本质上添加到内核epoll模型的rb_tree里面
_epoller_ptr->EpollerCtl(EPOLL_CTL_ADD, _listensock_ptr->Getfd(), EVENT_IN); //关心读事件
struct epoll_event revs[defaultnum]; //存放就绪的事件
for(; ;)
{
//epoll只负责等待
int n = _epoller_ptr->EpollerWait(revs, defaultnum);
if(n > 0)
{
//有事件就绪
log(Debug, "event happend, fd is : %d", revs[0].data.fd);
//提取就绪事件 epoll_wait返回值会返回就绪的事件数量
//如果数量大于定义的大小, 下次再捞
Dispatcher(revs, n);
}
else if(n == 0)
{
log(Info, "time out...");
}
else
{
log(Error, "epoll_wait error");
}
}
}
~EpollServer()
{
_listensock_ptr->Close();
}
private:
std::shared_ptr<MySocket> _listensock_ptr;
std::shared_ptr<Epoller> _epoller_ptr;
//MySocket _listensock;
uint16_t _port;
//Epoller _epoller;
};
4. epoll两种工作模式
LT
模式:
epoll
默认工作模式是LT(Level Triggered水平触发)
模式
当事件到来时,如果上层一直不取走,底层会一直通知
select
和poll
采用的也是LT
模式
EL
模式:
EL(Edge Triggered边缘触发)
模式是当数据变化的时候,才会通知一次
数据从无到有,从少到多
打个比方:
快递员A(
LT模式
)送快递的时候,如果客户一直不取,他就一直打电话,说你的快递到了,签收一下;快递员B(
ET模式
)送快递的时候,只通知一次,然后就放在驿站了;如果之后又有快递到了,则又通知一次;快递员A在一个小时只能,可能只能通知到几个客户;而快递员B在一个小时之内可以通知多个客户
ET
不止通知效率高于LT
,IO
效率也高于LT
由于
ET
只通知一次,所以就倒逼上层,每次都要把本轮数据全部取走
如何知道本轮数据全部取完?
比如说,我们有550g的大米,每天要吃100g,前5天正常,到第6天的时候,原本是要吃50g大米的,可是只能吃50g了,这就说明大米没有了
也就是说当需要读取的目标数据大于实际读取的数据的时候,就表明数据已经全部取走。
这就需要我们循环读取数据,直到读取出错为止,可是fd
是默认是阻塞的,所以在ET
模式下,所有的fd
要设置成非阻塞Non_block
,如果不设置,程序会一直阻塞住每次都能取走全部的数据,接收缓冲区就有空间了,这样
tcp
就能给对方通知更大的窗口,然后对方就可以给我们发送更多的数据
ET
是否一定比LT
高效?将
LT
所有的fd
设置成non_block
非阻塞,然后循环读取,这就个ET
类似了
这里所谓的通知一次和每次通知,本质上其实是向就绪队列添加一次还是每次都添加