【Linux】IO多路转接技术Epoll的使用
文章目录
- 【Linux】IO多路转接技术Epoll的使用
- 前言
- 正文
- 接口介绍
- 工作原理
- LT模式与ET模式
- 边缘触发(ET)
- 水平触发(LT)
- 理解ET模式和非阻塞文件描述符
- ET模式`epoll`实现TCP服务器
- 简单地封装`epoll`系统调用
- 封装网络套接字接口
- 编写TCP服务器
前言
在学习epoll之前,我们首先了解一下Linux中的多路复用技术:
在Linux系统中,IO多路复用是一种重要的技术,它允许一个进程同时监视多个文件描述符,一旦某个描述符准备好进行读取(通常是读就绪或写就绪),内核会通知该进程进行相应的读写操作。这样,我们可以有效地处理多个I/O事件而不需要创建多个线程或进程,从而减小系统开销。这种可以同时监视多个文件描述符的技术经常用于会维护很多文件描述符的高并发网络编程。其中多路复用共有三种方案,分别是select、poll、epoll,而epoll是目前的多路复用技术中最先进且常用的技术。
正文
接口介绍
epoll
是 Linux 下的一种 I/O 事件通知机制,用于高效地处理大量的文件描述符(sockets、文件等)上的 I/O 事件。epoll
提供了三个主要的接口函数:
epoll_create
:创建一个epoll
实例。
- 功能:创建一个
epoll
实例,返回一个文件描述符,用于标识该epoll
实例。- 参数:无参数或者一个整数,指定要返回的文件描述符的数量(在新的内核版本中该参数已经被忽略)。
- 返回值:返回一个指向
epoll
实例的文件描述符,如果出错,返回 -1。
epoll_ctl
:控制epoll
实例上的事件。
- 功能:向
epoll
实例中添加、修改或删除感兴趣的事件。- 参数:
epfd
:epoll
实例的文件描述符。op
:要执行的操作,可以是EPOLL_CTL_ADD
、EPOLL_CTL_MOD
或EPOLL_CTL_DEL
。fd
:需要添加、修改或删除的文件描述符。event
:指向epoll_event
结构体数组的指针,描述了关联于文件描述符fd
的事件。- 返回值:成功时返回 0,失败时返回 -1。
epoll_wait
:等待epoll
实例上的事件发生。
- 功能:阻塞等待
epoll
实例上注册的文件描述符上的事件发生。- 参数:
epfd
:epoll
实例的文件描述符。events
:指向epoll_event
结构体数组的指针,用于存储发生的事件。maxevents
:events
数组的大小,指定最多可以存储多少个事件。timeout
:等待的超时时间,单位为毫秒;如果传入 -1,则表示永远等待直到有事件发生。- 返回值:返回发生的事件的数量,如果超时则返回 0,如果出错则返回 -1。
工作原理
在大致了解了epoll的基础知识以及它的系统调用之后,我们继续来了解epoll的底层工作原理。
为了获取更高的性能以及支持更大的并发连接,epoll的底层采用了一颗红黑树以及一个就绪队列对事件进行管理。让我们画图进行表示:
每当我们调用epoll_create,系统就会在内核中帮我们创建出一个这样的结构体对应着上面的结构(进程使用这个数据结构的方式是将它的指针放入到进程控制块(task_struct
)中的文件描述符所对应的struct file
中):
struct eventpoll
{ ....
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
struct rb_root rbr;
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
struct list_head rdlist;
....
};
我们由此可以发现epoll
的优势:
epoll
使用基于事件驱动的模型,可以有效地处理大量并发连接,是更高效的事件通知机制,避免了poll
中遍历线性数组的性能瓶颈。在大规模并发连接的场景下,epoll
的性能表现更为出色。epoll
在内核中利用红黑树(Red-Black Tree)来管理关心的事件和文件描述符,而不是通过传统的遍历线性数组的方式进行管理。这意味着epoll
可以轻松处理数以万计的连接,而不会随着连接数量的增加而有较大的性能下降。- 不同于
select
和poll
,在epoll
中,内核直接将数据从内核缓冲区拷贝到用户空间缓冲区,避免了额外的数据拷贝过程,提高了数据传输的效率。
LT模式与ET模式
epoll
支持两种事件触发方式:边缘触发(Edge-Triggered,简称 ET)和水平触发(Level-Triggered,简称 LT)。它们之间有一些区别,下面我会详细介绍它们以及各自的优缺点。
边缘触发(ET)
- 触发条件:只有当事件状态发生变化,由不可读/不可写变为可读/可写时,
epoll
才会通知用户空间。 - 优点:
- 对于大量并发连接,ET 触发模式下的事件通知更高效,因为它只在状态变化时通知,避免了重复通知。
- ET的触发条件倒逼程序员每次通知时都需要一次性读完本轮的所有数据,于是其更适用于非阻塞IO,能够充分地利用系统资源。
- 缺点:
- ET触发模式的代码复杂度更大。
- 用户需要及时处理完整个数据流,否则可能会错过部分数据,因为下次的触发条件是状态的变化。
- 对于文件描述符的读写操作,必须一直读取/写入直至返回
EAGAIN
错误,否则可能会错过数据。
水平触发(LT)
- 触发条件:只要文件描述符处于可读/可写状态,
epoll
就会不断通知用户空间。 - 优点:
- 对于普通的 I/O 操作,LT 触发模式下更容易使用,因为不需要像 ET 模式那样严格控制数据的读写。
- 用户处理数据时可以按照自己的节奏进行,不必担心错过部分数据。
- 缺点:
- 在大量并发连接的情况下,LT 模式可能会导致频繁的事件通知,增加了系统开销。
- 容易出现事件饥饿(Event Starvation)问题,即某些事件一直处于就绪状态但得不到及时处理。
总结来说:
- ET 触发模式适用于高性能、高并发的场景,能够更精确地通知事件状态的变化,但要求用户处理数据时要及时且完整。
- LT 触发模式更适合普通的 I/O 操作,用户可以按照自己的节奏进行数据处理,但可能会出现频繁的事件通知和事件饥饿问题。
理解ET模式和非阻塞文件描述符
使用 ET 模式的 epoll, 需要将文件描述设置为非阻塞。 这个不是接口上的要求, 而是 “工程实践” 上的要求。 假设这样的场景: 服务器接受到一个10k的请求, 会向客户端返回一个应答数据. 如果客户端收不到应答, 不会发送第 二个10k请求。
如果服务端写的代码是阻塞式的read
, 并且一次只 read
1k 数据的话(read不能保证一次就把所有的数据都读出来, 参考 man 手册的说明, 可能被信号打断), 剩下的9k数据就会待在缓冲区中。
此时由于 epoll
是ET模式, 并不会认为文件描述符读就绪。epoll_wait
就不会再次返回. 剩下的 9k 数据会一直在缓 冲区中。直到下一次客户端再给服务器写数据,epoll_wait
才能返回。
但是问题来了。
- 服务器只读到1k个数据, 要10k读完才会给客户端返回响应数据。
- 客户端要读到服务器的响应 。
- 客户端发送了下一个请求, epoll_wait 才会返回, 才能去读缓冲区中剩余的数据。
所以, 为了解决上述问题(阻塞read不一定能一下把完整的请求读完), 于是就可以使用非阻塞轮询的方式来读缓冲区, 保证一定能把完整的请求都读出来。
而如果是LT就没这个问题. 只要缓冲区中的数据没读完, 就能够让 epoll_wait
返回文件描述符读就绪.
由此我们也可以发现,其实LT模式其实未必比ET模式效率低,如果我们在LT模式中也每一次通知都直接取走所有数据,那么效率就与ET模式相同了。
ET模式epoll
实现TCP服务器
了解完关于epoll
的理论,我们来尝试使用ET方式编写一个简单的TCP服务器。
简单地封装epoll
系统调用
首先我们对epoll
接口进行简单的封装,封装成一个叫做Epoller
的类:
// 使用此类继承得到的类无法被拷贝
class nocopy
{
public:
nocopy(){}
nocopy(const nocopy&) = delete;
nocopy& operator=(const nocopy&) = delete;
};
class Epoller : public nocopy
{
static const int size = 128;
public:
Epoller()
{
_epfd = epoll_create(size);
if(_epfd == -1)
{
// lg是独立编写的打印日志模块
lg(Warning, "epoll create fail!, strerr: %s", strerror(errno));
}
else
{
lg(Info, "epoll create success! _epfd: %d", _epfd);
}
}
int EpollerWait(struct epoll_event* revs, int num, int timeout = -1)
{
// timeout 非阻塞 一直阻塞
int n = epoll_wait(_epfd, revs, num, timeout);
return n;
}
void EpollerUpdate(int oper, int sockfd, uint32_t event)
{
int n;
if(oper == EPOLL_CTL_DEL)
{
n = epoll_ctl(_epfd, oper, sockfd, nullptr);
if(n < 0)
lg(Error, "epoll event delete fail!");
lg(Debug, "epoll delete fd : %d", sockfd);
}
else
{
struct epoll_event ev;
ev.events = event;
ev.data.fd = sockfd;
n = epoll_ctl(_epfd, oper, sockfd, &ev);
if(n < 0)
lg(Error, "epollctl fail!");
}
}
~Epoller()
{
if(_epfd > 0)
close(_epfd);
}
private:
int _epfd;
};
封装网络套接字接口
然后对套接字进行简单的封装,便于对listen套接字的使用:
enum{
SocketError = 1,
BindError,
ListenError
};
const int backlog = 10;
class Sock
{
public:
Sock()
{}
void Socket()
{
sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd_ < 0)
{
lg(Fatal, "socket error! errno:%d errstr:%s", errno, strerror(errno));
exit(SocketError);
}
int opt = 1;
setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
}
void Bind(uint16_t port)
{
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
if(bind(sockfd_, (struct sockaddr*)&local, sizeof(local)) < 0)
{
lg(Fatal, "bind error! errno:%d errstr:%s", errno, strerror(errno));
exit(BindError);
}
}
void Listen()
{
if(listen(sockfd_, backlog) < 0)
{
lg(Fatal, "listen error, errno:%d, errorstring:%s", errno, strerror(errno));
exit(ListenError);
}
}
int Accept(std::string *clientip, uint16_t *clientport)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int newfd = accept(sockfd_, (struct sockaddr*)&peer, &len);
if(newfd < 0)
{
lg(Warning , "listen error, errno:%d, errorstring:%s", errno, strerror(errno));
return -1;
}
char ipstr[64];
inet_ntop(AF_INET, &(peer.sin_addr), ipstr, sizeof(ipstr));
*clientip = ipstr;
*clientport = ntohs(peer.sin_port);
return newfd;
}
bool Connect(const std::string& serverip, const uint16_t serverport)
{
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(serverport);
inet_pton(AF_INET, serverip.c_str(), &(server.sin_addr));
int n = connect(sockfd_, (struct sockaddr*)&server, sizeof(server));
if(n < 0)
{
lg(Warning , "connect error, errno:%d, errorstring:%s", errno, strerror(errno));
return false;
}
return true;
}
void Close()
{
close(sockfd_);
}
int Fd()
{
return sockfd_;
}
~Sock()
{}
private:
int sockfd_;
};
编写TCP服务器
最后我们对TCP服务器进行编写,其中封装了两个类,分别是:
Connection类:用于表示TCP服务器接收到的链接,其中封装了读写缓冲区以及读写异常方法。方便TCP服务器进行管理。
TcpServer类:用于表示TCP服务器,封装了
Epoller
以及Connection类与服务器接收到的sockfd
的映射关系,以及一些服务器的方法。底层使用epoll的ET模式对事件进行关心。
class Connection;
class TcpServer;
using func_t = function<void(std::weak_ptr<Connection>)>;
using excp_func_t = function<void(std::weak_ptr<Connection>)>;
class Connection
{
public:
Connection(int sockfd, const std::weak_ptr<TcpServer> svr_ptr)
: _sockfd(sockfd),
_tcpsvr_ptr(svr_ptr)
{
}
void SetHandler(func_t recv_cb, func_t send_cb, excp_func_t excp_cb)
{
_recv_cb = recv_cb;
_send_cb = send_cb;
_excp_cb = excp_cb;
}
void AppendInbuffer(const std::string &info)
{
_inbuf += info;
}
void AppendOutbuffer(const std::string &info)
{
_outbuf += info;
}
int Sockfd()
{
return _sockfd;
}
string &Inbuffer()
{
return _inbuf;
}
string &Outbuffer()
{
return _outbuf;
}
~Connection()
{
}
private:
// TCP链接的文件描述符以及读写缓冲区
int _sockfd;
std::string _inbuf;
std::string _outbuf;
public:
// 链接的读写方法
func_t _recv_cb;
func_t _send_cb;
excp_func_t _excp_cb;
std::weak_ptr<TcpServer> _tcpsvr_ptr;
std::string _ip;
uint16_t _port;
};
uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
void SetNonBlock(int fd)
{
int fl = fcntl(fd, F_GETFL);
if(fl < 0)
{
perror("fcntl");
return;
}
fcntl(fd, F_SETFL, fl | O_NONBLOCK);
std::cout << "set " << fd << " nonblock." << std::endl;
}
class TcpServer : public nocopy, public enable_shared_from_this<TcpServer>
{
private:
// 一次接收事件的最大数量
static const int num = 64;
// 接收缓冲区的默认大小
static const int g_buffer_size = 128;
private:
// Epoller的智能指针
std::shared_ptr<Epoller> _epoller_ptr;
// epoll事件的返回数组
struct epoll_event revs[num];
// listen套接字的智能指针
std::shared_ptr<Sock> _listensock_ptr;
// TCP链接的sockfd与对应Connection类的映射关系
std::unordered_map<int, std::shared_ptr<Connection>> _connections;
// 服务器的端口号
uint16_t _port;
bool _quit;
func_t _OnMessage;
public:
TcpServer(uint16_t port, func_t OnMessage)
: _port(port),
_epoller_ptr(new Epoller),
_listensock_ptr(new Sock),
_OnMessage(OnMessage)
{
}
void Init()
{
_listensock_ptr->Socket();
// ET模式下每一个sockfd都需要设置非阻塞
SetNonBlock(_listensock_ptr->Fd());
_listensock_ptr->Bind(_port);
_listensock_ptr->Listen();
lg(Info, "listensock create successfully!");
AddConnection(_listensock_ptr->Fd(), EVENT_IN,
std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
}
void Loop()
{
_quit = false;
while (!_quit)
{
Dispatcher(-1);
// PrintConnection();
}
_quit = true;
}
void PrintConnection()
{
std::cout << "_connections fd list: ";
for (auto &connection : _connections)
{
std::cout << connection.second->Sockfd() << ", ";
std::cout << "inbuffer: " << connection.second->Inbuffer().c_str();
}
std::cout << std::endl;
}
~TcpServer()
{
}
public:
bool IsConnectionSafe(int sockfd)
{
auto iter = _connections.find(sockfd);
if (iter == _connections.end())
return false;
return true;
}
void AddConnection(int sockfd, uint32_t event, func_t recv_cb, func_t send_cb, excp_func_t excp_cb,
const string &ip = "0.0.0.0", uint16_t port = 0)
{
// 1.首先创建出Connection对象
std::shared_ptr<Connection> connection(new Connection(sockfd, shared_from_this()));
connection->SetHandler(recv_cb, send_cb, excp_cb);
connection->_ip = ip;
connection->_port = port;
// 2.然后将对应的sockfd加入进epoll内核中
_epoller_ptr->EpollerUpdate(EPOLL_CTL_ADD, sockfd, event);
// 3.最后将connection加入到映射中
_connections[sockfd] = connection;
lg(Info, "a connection create successfully! sockfd is : %d", sockfd);
}
void Dispatcher(int timeout)
{
int n = _epoller_ptr->EpollerWait(revs, num, timeout);
for (int i = 0; i < n; ++i)
{
uint32_t events = revs[i].events;
int sockfd = revs[i].data.fd;
bool flag = IsConnectionSafe(sockfd);
// 只需要处理读写事件,如果中途出现问题则直接在读写函数内进行处理
if ((events & EPOLLIN) && flag)
if (_connections[sockfd]->_recv_cb)
_connections[sockfd]->_recv_cb(_connections[sockfd]);
if ((events & EPOLLOUT) && flag)
if (_connections[sockfd]->_send_cb)
_connections[sockfd]->_send_cb(_connections[sockfd]);
}
}
void Accepter(std::weak_ptr<Connection> conn)
{
auto connection = conn.lock();
while (true)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sockfd = ::accept(connection->Sockfd(), (struct sockaddr *)&peer, &len);
if (sockfd > 0)
{
uint16_t peerport = ntohs(peer.sin_port);
char ipbuf[128];
inet_ntop(AF_INET, &(peer.sin_addr), ipbuf, sizeof(ipbuf));
// ET模式下每一个sockfd都需要设置非阻塞
SetNonBlock(sockfd);
AddConnection(sockfd, EVENT_IN,
std::bind(&TcpServer::Recver, this, std::placeholders::_1),
std::bind(&TcpServer::Sender, this, std::placeholders::_1),
std::bind(&TcpServer::Excepter, this, std::placeholders::_1),
ipbuf, peerport);
lg(Info, "get a new client link, get info-> [%s:%d], sockfd : %d", ipbuf, peerport, sockfd);
}
else
{
// 事件不就绪,直接结束轮询
if (errno == EWOULDBLOCK)
break;
// 被信号中断,重新开始轮询
else if (errno == EINTR)
continue;
else
break;
}
}
}
void Recver(std::weak_ptr<Connection> conn)
{
auto connection = conn.lock();
int sockfd = connection->Sockfd();
while (true)
{
char buff[g_buffer_size];
memset(buff, 0, sizeof(buff));
ssize_t n = recv(sockfd, buff, sizeof(buff), 0);
if (n > 0)
{
connection->AppendInbuffer(buff);
}
else if (n == 0)
{
lg(Info, "client info [%s:%d] close the link, me too.", connection->_ip.c_str(), connection->_port);
connection->_excp_cb(connection);
break;
}
else
{
if (errno == EWOULDBLOCK)
break;
else if (errno == EINTR)
continue;
else
{
lg(Warning, "sockfd: %d, client info [%s:%d] recv error...", sockfd, connection->_ip, connection->_port);
connection->_excp_cb(connection);
break;
}
}
}
_OnMessage(connection);
}
void EnableEvent(int sockfd, bool readable, bool writeable)
{
uint32_t events;
events |= (readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET;
_epoller_ptr->EpollerUpdate(EPOLL_CTL_MOD, sockfd, events);
}
void Sender(std::weak_ptr<Connection> conn)
{
auto connection = conn.lock();
auto &outbuffer = connection->Outbuffer();
int sockfd = connection->Sockfd();
while (true)
{
ssize_t n = send(sockfd, outbuffer.c_str(), outbuffer.size(), 0);
if (n > 0)
{
outbuffer.erase(0, n);
if (outbuffer.empty())
break;
}
else if (n == 0)
{
return;
}
else
{
if (errno == EWOULDBLOCK)
break;
else if (errno == EINTR)
break;
else
{
lg(Warning, "sockfd: %d, client info [%s:%d] recv error...", sockfd, connection->_ip, connection->_port);
connection->_excp_cb(connection);
break;
}
}
}
if(outbuffer.empty())
{
// 关闭对写事件的关心
EnableEvent(sockfd, true, false);
}
else
{
// 开启对写事件的关心
EnableEvent(sockfd, true, true);
}
}
void Excepter(std::weak_ptr<Connection> conn)
{
auto connection = conn.lock();
int sockfd = connection->Sockfd();
if(!IsConnectionSafe(sockfd)) return;
// 1.从epoll内核中删除当前链接的fd
_epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, sockfd, 0);
// 2.关闭当前文件描述符
close(sockfd);
lg(Debug, "sockfd : %d closed.", sockfd);
// 3.从映射中删除当前sockfd对应的链接
_connections.erase(sockfd);
lg(Debug, "remove sockfd : %d, from unordered_map.", sockfd);
}
};
se
{
// 开启对写事件的关心
EnableEvent(sockfd, true, true);
}
}
void Excepter(std::weak_ptr<Connection> conn)
{
auto connection = conn.lock();
int sockfd = connection->Sockfd();
if(!IsConnectionSafe(sockfd)) return;
// 1.从epoll内核中删除当前链接的fd
_epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, sockfd, 0);
// 2.关闭当前文件描述符
close(sockfd);
lg(Debug, "sockfd : %d closed.", sockfd);
// 3.从映射中删除当前sockfd对应的链接
_connections.erase(sockfd);
lg(Debug, "remove sockfd : %d, from unordered_map.", sockfd);
}
};