文章目录
- 高级IO(二)
- I/O多路转接之poll
- poll服务器
- I/O多路转接之epoll
- epoll相关函数
- epoll工作原理
- epoll回调机制
- epoll服务器
- epoll的优点
高级IO(二)
I/O多路转接之poll
poll也是系统提供的一个多路转接接口
- poll系统调用也可以让我们的程序同时监视多个文件描述符上的事件是否就绪
poll 函数
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
-
fds:一个poll函数监视的结构列表,每一个元素包含三个部分:文件描述符、监视的事件集合、就绪的事件集合
-
nfds : fds数组的长度
-
timeout : 表示poll函数的超时时间,单位是毫秒
-
参数调用成功,返回事件就绪的文件描述符个数,timeout时间耗尽返回0,函数调用失败返回-1
poll服务器
Socket类
#pragma once
#include <iostream>
#include <unistd.h>
#include <memory.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
class Socket{
public:
static int SocketCreate();
static void SocketBind(int sock, int port);
static void SocketListen(int sock, int backlog);
};
int Socket::SocketCreate(){
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
std::cerr << "socket error" << std::endl;
exit(2);
}
// 设置端口复用
int opt = 1;
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
std::cout << "SocketCreate success" << std::endl;
return sockfd;
}
void Socket::SocketBind(int sock, int port) {
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
socklen_t len = sizeof(local);
if (bind(sock, (struct sockaddr*)&local, len) < 0) {
std::cerr << "bind error" << std::endl;
exit(3);
}
std::cout << "SocketBind success" << std::endl;
}
void Socket::SocketListen(int sock, int backlog) {
if (listen(sock, backlog) < 0) {
std::cerr << "listen error" << std::endl;
exit(4);
}
std::cout << "SocketListen success" << std::endl;
}
PollServer类
#pragma once
#include "socket.hpp"
#include <poll.h>
#define BACK_LOG 5
#define DFT_PORT 8889
#define POLL_CAP 1024
#define DFT_FD -1
class PollServer{
public:
~PollServer() { if (listen_sock > 0) close(listen_sock); }
static PollServer* GetInstance(int port = DFT_PORT);
void InitPollServer();
void Run();
private:
PollServer(int _port) : listen_sock(-1), port(_port){}
void ClearPollfds(struct pollfd fds[], int num, int default_fd);
bool SetPollfds(struct pollfd fds[], int num, int fd);
void HandlerEvent(struct pollfd fds[], int num);
void UnSetPoolfds(struct pollfd fds[], int index);
private:
int listen_sock;
int port;
static PollServer* instance;
};
PollServer* PollServer::instance = nullptr;
PollServer* PollServer::GetInstance(int port) {
if (instance == nullptr)
instance = new PollServer(port);
return instance;
}
void PollServer::InitPollServer() {
listen_sock = Socket::SocketCreate();
Socket::SocketBind(listen_sock, port);
Socket::SocketListen(listen_sock, BACK_LOG);
}
void PollServer::ClearPollfds(struct pollfd fds[], int num, int default_fd) {
for (int i = 0; i < num; i++) {
fds[i].fd = default_fd;
fds[i].events = 0;
fds[i].revents = 0;
}
}
bool PollServer::SetPollfds(struct pollfd fds[], int num, int fd) {
for (int i = 0; i < num; i++) {
if (fds[i].fd == DFT_FD) {
fds[i].fd = fd;
fds[i].events |= POLLIN; // 添加读事件
return true;
}
}
return false;
}
void PollServer::Run() {
struct pollfd fds[POLL_CAP];
ClearPollfds(fds, POLL_CAP, DFT_FD);
SetPollfds(fds, POLL_CAP, listen_sock);
for (; ;) {
switch(poll(fds, POLL_CAP, -1)) {
case 0:
std::cout << "timeout..." << std::endl; break;
case -1:
std::cerr << "poll error" << std::endl;
default:
HandlerEvent(fds, POLL_CAP);
break;
}
}
}
void PollServer::UnSetPoolfds(struct pollfd fds[], int index) {
fds[index].fd = DFT_FD;
fds[index].events = 0;
fds[index].revents = 0;
}
void PollServer::HandlerEvent(struct pollfd fds[], int num) {
for (int i = 0; i < num; i++) {
if (fds[i].fd == DFT_FD) continue;
if (fds[i].fd == listen_sock && fds[i].revents & POLLIN) { // 读连接
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
memset(&peer, 0, sizeof(peer));
int sock = accept(listen_sock, (struct sockaddr*)&peer, &len);
if (sock < 0) {
std::cerr << "aceept error" << std::endl;
continue;
}
std::string peer_ip = inet_ntoa(peer.sin_addr);
short peer_port = ntohs(peer.sin_port);
std::cout << "get a new link -> [" << peer_ip << ":" << peer_port << "]" << std::endl;
if (!SetPollfds(fds, POLL_CAP, sock)){
close(sock);
std::cout << "poll server is full, close fd :" << sock << std::endl;
}
} else if (fds[i].revents & POLLIN) {
#define BUFFER_SIZE 1024
char buffer [BUFFER_SIZE];
ssize_t size = read(fds[i].fd, buffer, sizeof(buffer));
if (size > 0) {
buffer[size - 1] = 0;
std::cout << "echo #" << buffer << std::endl;
} else if (size == 0) {
std::cout << "client quit..." << std::endl;
close(fds[i].fd);
UnSetPoolfds(fds, i);
} else {
std::cerr << "read error" << std::endl;
close(fds[i].fd);
UnSetPoolfds(fds, i);
}
}
}
}
服务器测试
#include "poll_server.hpp"
#include <string>
void Usage(char* proc) {
std::cout << "Usage: " << proc << " port" << std::endl;
}
int main(int argc, char* argv[]) {
if (argc != 2) {
Usage(argv[0]);
exit(1);
}
int port = atoi(argv[1]);
PollServer* ps = PollServer::GetInstance(port);
ps->InitPollServer();
ps->Run();
return 0;
}
I/O多路转接之epoll
epoll也是系统提供的一个多路转接接口
- epoll系统调用也可以让我们的程序同时监视多个文件描述符上的事件是否就绪,与select和poll的定位相同,使用场景也一样
- epoll( extend poll ) ,可以理解成poll的延伸,epoll是为了同时处理大量的文件描述符改进的poll
- epoll在2.5.44内核中被引进,几乎具备了select和poll的所有优点,被公认为Linux2.6下性能最好的多路复用IO通知方法
epoll相关函数
epoll_create 函数
epoll_create 用于创建一个epoll模型
int epoll_create(int size);
int epoll_create1(int flags);
-
Linux2.6.8版本之后,size函数可以被忽略,但必须设置成大于0的值
-
epoll模型创建成功返回对应的文件描述符,失败返回-1同时设置错误码
当不再使用时,必须调用close函数关闭epoll模型响应文件描述符,当所有epoll实例的文件描述符都关闭时,内核将销毁该实例并且释放相关资源
epoll_ctl函数
epoll_ctl函数用于指定epoll模型中注册事件,该函数的原型如下
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
-
epfd : 指定epoll模型
-
op : 选项,由三个宏表示
-
fd : 需要监视的文件描述符
-
event : 需要监视该文件上的拿一些事件
-
函数调用成功返回0,失败返回-1,同时设置错误码
option 三个选项
EPOLL_CTL_ADD
:注册新的文件描述符到指定的epoll模型中EPOLL_CTL_MOD
:修改已经注册的文件描述符中的监听事件EPOLL_CTL_DEL
:从epoll模型中删除指定的文件描述符
event 参数对应的struct epoll_event结构体
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
struct epoll_event有两个成员,第一个成员表示需要监视的事件,第二个成员data是一个联合体结构,一般使用该结构体中的fd,表示需要监听的文件描述符
EPOLLIN
The associated file is available for read(2) operations.
# 表示对应的文件描述符可以读(包括对端socket正常关闭)
EPOLLOUT
The associated file is available for write(2) operations.
# 表示对应的文件描述符可以写
EPOLLRDHUP (since Linux 2.6.17)
Stream socket peer closed connection, or shut down writing half of connection.
(This flag is especially useful for writing simple code to detect peer shutdown
when using Edge Triggered monitoring.)
EPOLLPRI
There is urgent data available for read(2) operations.
# 表示对应文件描述符有进击数据可读(外带数据到来)
EPOLLERR
Error condition happened on the associated file descriptor. epoll_wait(2) will
always wait for this event; it is not necessary to set it in events.
# 表示对应的文件描述符发送错误
EPOLLHUP
Hang up happened on the associated file descriptor. epoll_wait(2) will always
wait for this event; it is not necessary to set it in events.
# 表示对应的文件描述符被挂断了,即对端文件描述符关闭了
EPOLLET
Sets the Edge Triggered behavior for the associated file descriptor. The
default behavior for epoll is Level Triggered. See epoll(7) for more detailed
information about Edge and Level Triggered event distribution architectures.
# 将epoll的工作方式设置成边缘触发(Edge Triggered)模式
EPOLLONESHOT (since Linux 2.6.2)
Sets the one-shot behavior for the associated file descriptor. This means that
after an event is pulled out with epoll_wait(2) the associated file descriptor
is internally disabled and no other events will be reported by the epoll inter‐
face. The user must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file
descriptor with a new event mask.
# 只监听一次事件,当监听完这次事件后,就将该文件描述符移出模型
这些数值都是由宏定义的,它们的二进制序列中有且只有一个比特位是1,且为1的比特位是各不相同的,可以才epoll.h文件中查看
epoll_wait 函数
epoll_wait函数用于收集监视的事件中已经就绪的事件,该函数的函数原型如下
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events, int maxevents, int timeout, const sigset_t *sigmask);
-
epfd : 指定的epoll模型
-
events: 内核会将已经就绪的事件拷贝到events数组当中(events不能是空指针,内核只负责将就绪的事件拷贝到events数组当中(内核只负责将就绪的事件拷贝到数组中,不会帮我们在用户态开辟内存,所以必须要提前开好空间,并且events绝对不能传空指针)
-
maxevents : events数组元素的个数,该值不能大于创建epoll模型size值
-
timeout: 表示epoll_wait函数的可超时事件,单位是毫秒
-
如果函数调用成功,返回事件就绪的文件描述符个数,如果时间耗尽返回0,如果调用失败返回-1,并设置错误码
timeout 的取值
- -1 : epoll_wait 进行阻塞式等待,直到被监视的某个文件描述符上的某个事件就绪
- 0 : epoll_wait调用后进行非阻塞等待,无论监视的文件描述符上的事件是否就绪,epoll_wait检测后都会立即返回
- 特定的事件: epoll_wait会在特定的时间内阻塞等待,如果就绪则直接返回,如果一直没有事件就绪则会超时返回
epoll_wait调用失败,错误码可能被设置为
EBADF
: 传入epoll模型的对应文件描述符无效EFAULT
: events指向的数组空间无法通过写入权限访问EINTR
: 此调用被信号中断EINVAL
: epfd不是一个epoll模型对应的文件描述符,或传入maxevents值小于零
epoll工作原理
当某一进程调用epoll_create
函数时,Linux内核会创建一个eventpoll结构体,也就是我们所说的epoll模型,eventpoll结构体当中的成员rbr和rdlist与epoll的使用方式密切相关
rbr 和 rdlist实际就是一颗红黑树和一个就绪队列
struct eventpoll{
// 红黑树的根结点,用于存储所有添加到epoll模型需要监视的文件描述符和事件
struct rb_root rbr;
// 就绪队列中存放的是将要通过epoll_wait返回给用户的满足条件的事件
struct list_head rdlist;
// 等待队列,多个执行流想同时访问一个epoll模型在此等待
struct list wait_queue;
}
- epoll模型中的红黑树本质就是告诉内核,需要监视哪些文件描述符上的哪些事件,调用epoll_ctl函数实际就是对这棵树进行增删改操作
- epoll模型当中的就绪队列就是告诉内核,哪些文件描述符上的哪些时间已经就绪,调用epoll_wait函数实际就是从就绪队列中获取已经就绪的事件
在epoll模型中,每一个事件都会有一个对应的epitem结构体,红黑树和就绪队列中的结点分别基于epitem结构中 的rbn成员和rdllink成员的,epitem结构体中的成员ffd记录的是指定的文件描述符值,event成员记录的就是文件描述符对应的事件
struct epitem{
struct rb_node rbn; // 红黑树结点
struct list_head rdllink; // 双向链表的结点
struct epoll_filefd ffd; // 事件的句柄信息
struct eventpoll *ep; // 指向其所属的eventpoll对象
struct epoll_event event; // 期待发生的事件类型
}
- 对于epitem结构中的rbn成员来说,ffd与event的含义是,需要监视ffd上的event事件是否就绪
- 对于epitem结构中的rdlink成员来说,ffd与event的含义是,ffd上的event事件已经准备就绪
调用epoll_ctl向红黑树中插入结点时,如果设置了EPOLLONESHOT
选项,当监听完这次事件后,就会将这个结点从二叉树中删除。如果没有设置这个选项,则该结点就会一直存在,除非用户调用epoll_ctl
将该节点从二叉树中删除
epoll回调机制
所有添加到红黑树当中的事件,都会与设备(网卡)驱动程序建立回调方法,这个回调方法在内核中叫做ep_poll_callback
- 对于select和poll来说,操作系统需要监视多个文件描述符,并主动对多个文件描述符进行轮询检测,这一定会增加操作系统负担
- 对于epoll来说,操作系统不需要主动检测,当事件就绪时,会自动调用对应的回调方法,将就绪的事件插入到队列当中即可
- 当用户调用epoll_wait函数获取事件,只需要关注底层的就绪队列是否为空即可,如果不为空就拷贝给用户
采用回调机制的最大好处就是不需要再让操作系统主动对事件是否就绪进行检测了,事件就绪后会自动调用回调函数进行处理
- 事件不断就绪,会不断调用回调方法向就绪队列中插入对应结点,上层也会不断通过epoll_wait函数从就绪队列中获取结点,这是典型的生产消费模型
- 有序就绪队列会被多个执行流同时访问,所以需要使用互斥锁对齐进行保护,epoll当中就有mtx来保护临界资源,epoll本身时线程安全的
- 所以eventpoll中还应该有一个等待队列,当多个执行流向同时访问一个epoll模型时,就需要再等待队列下进行等待
epoll服务器
Socket 类
这个类已经实现过很多次了,这里就不多赘述了
#pragma once
#include <iostream>
#include <unistd.h>
#include <memory.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
class Socket{
public:
static int SocketCreate();
static void SocketBind(int sock, int port);
static void SocketListen(int sock, int backlog);
};
int Socket::SocketCreate(){
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
std::cerr << "socket error" << std::endl;
exit(2);
}
// 设置端口复用
int opt = 1;
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
std::cout << "SocketCreate success" << std::endl;
return sockfd;
}
void Socket::SocketBind(int sock, int port) {
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
socklen_t len = sizeof(local);
if (bind(sock, (struct sockaddr*)&local, len) < 0) {
std::cerr << "bind error" << std::endl;
exit(3);
}
std::cout << "SocketBind success" << std::endl;
}
void Socket::SocketListen(int sock, int backlog) {
if (listen(sock, backlog) < 0) {
std::cerr << "listen error" << std::endl;
exit(4);
}
std::cout << "SocketListen success" << std::endl;
}
EpollServer 类
EpollServer类当中除了需要包含服务器绑定的端口号以及监听套接字,还需要增加一个用于描述epoll模型对应的文件描述符的成员变量
#include "socket.hpp"
#include <sys/epoll.h>
#define EPOLL_CAP 1024
#define DEFAULT_PORT 8889
#define BACK_LOG 5
class EpollServer{
public:
static EpollServer* GetInstance(int _port = DEFAULT_PORT); // 获取单例
~EpollServer(); // 析构函数
void EpollServerInit(); // 服务器初始化
void Run();
private:
EpollServer(int _port) : port(_port), listen_sock(-1), epoll_fd(-1){} // 构造函数
void AddEvent(int sock, uint32_t event);
void DelEvent(int sock);
void HandlerEvent(struct epoll_event revs[], int num);
private:
int port;
int listen_sock;
int epoll_fd;
static EpollServer* instance; // 用于创建单例模式
};
EpollServer* EpollServer::instance = nullptr;
EpollServer* EpollServer::GetInstance(int _port) {
if (instance == nullptr) {
instance = new EpollServer(_port);
}
return instance;
}
EpollServer::~EpollServer(){
if (listen_sock > 0) close(listen_sock);
if (epoll_fd > 0) close(epoll_fd);
}
void EpollServer::EpollServerInit() {
listen_sock = Socket::SocketCreate();
Socket::SocketBind(listen_sock, port);
Socket::SocketListen(listen_sock, BACK_LOG);
epoll_fd = epoll_create(EPOLL_CAP);
if (epoll_fd < 0) {
std::cerr << "epoll_create error" << std::endl;
exit(5);
}
}
运行服务器
void EpollServer::AddEvent(int sock, uint32_t event) {
struct epoll_event ev;
ev.events = event;
ev.data.fd = sock;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev);
}
void EpollServer::Run(){
AddEvent(listen_sock, EPOLLIN);
for (;;){
struct epoll_event revs[EPOLL_CAP]; // 用于接收已经就绪的事件
int num = epoll_wait(epoll_fd, revs, EPOLL_CAP, -1);
if (num < 0) {
std::cerr << "epoll_wait error" << std::endl;
} else if (num == 0) {
std::cout << "time out..." << std::endl;
} else {
HandlerEvent(revs, num); // 事件处理
}
}
}
- 默认情况下,只要有底层就绪事件没有处理,epoll就会一直通知用户,也就是调用epoll_wait会一直成功返回,并将就绪的事件拷贝到传入的数组中。
- 事件处理并非是将就绪队列中的数据拷贝到用户层,比如套接字的读事件就绪,需要调用accept获取到底层连接才算是处理完事件
事件处理
- 调用epoll_wait得到的返回值来判断操作系统向revs数组中拷贝了多少个struct epoll_event 结构,进而一个个获取事件进行处理
void EpollServer::AddEvent(int sock, uint32_t event) {
struct epoll_event ev;
ev.events = event;
ev.data.fd = sock;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev);
}
void EpollServer::DelEvent(int sock) {
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, sock, nullptr);
}
void EpollServer::HandlerEvent(struct epoll_event revs[], int num) {
for (int i = 0; i < num; i++) {
if (revs[i].data.fd == listen_sock && revs[i].events & EPOLLIN) {
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sock = accept(listen_sock, (struct sockaddr*)&peer, &len);
if (sock >= 0) {
std::string client_ip = inet_ntoa(peer.sin_addr);
short client_port = ntohs(peer.sin_port);
std::cout << "get a new link [" << client_ip << ":" << client_port << std::endl;
AddEvent(sock, EPOLLIN);
} else {
std::cerr << "accept error" << std::endl;
continue;
}
} else if (revs[i].events & EPOLLIN) {
#define BUFFER_SIZE 1024
char buffer[BUFFER_SIZE];
ssize_t size = read(revs[i].data.fd, buffer, sizeof(buffer) - 1);
if (size > 0) {
buffer[size - 1] = 0;
std::cout << "echo # " << buffer << std::endl;
} else if (size == 0) {
std::cout << "client quit..." << std::endl;
DelEvent(revs[i].data.fd);
close(revs[i].data.fd);
}
else {
std::cerr << "recv error" << std::endl;
close(revs[i].data.fd);
DelEvent(revs[i].data.fd);
}
}
}
}
epoll 服务器测试
#include "epoll_server.hpp"
#include <string>
static void Usage(std::string proc) {
std::cout << "Usage: " << proc << " port " << std::endl;
}
int main(int argc, char* argv[]) {
if (argc != 2) {
Usage(argv[0]);
exit(1);
}
int port = atoi(argv[1]);
EpollServer* es = EpollServer::GetInstance(port);
es->EpollServerInit();
es->Run();
return 0;
}
这里我们编写的时单进程epoll服务器,但是其可以位多个客户端提供服务。我们可以使用ls /proc/PID/fd
命令没查看epoll服务器的文件描述符的使用情况
[clx@VM-20-6-centos poll_server]$ ll /proc/22697/fd
total 0
lrwx------ 1 clx clx 64 Jul 9 22:15 0 -> /dev/pts/5
lrwx------ 1 clx clx 64 Jul 9 22:20 1 -> /dev/pts/5
lrwx------ 1 clx clx 64 Jul 9 22:15 2 -> /dev/pts/5
lrwx------ 1 clx clx 64 Jul 9 22:20 3 -> socket:[1478855763]
lrwx------ 1 clx clx 64 Jul 9 22:20 4 -> anon_inode:[eventpoll]
lrwx------ 1 clx clx 64 Jul 9 22:20 5 -> socket:[1478878270]
[clx@VM-20-6-centos poll_server]$
epoll的优点
- 接口使用方便,使用起来非常方便高效
- 数据轻量拷贝:只在新增监视事件需要调用epoll_ctl将数据拷贝到内核。而select 和 poll每次都要重新将所有需要监视的事件从用户拷贝到内核。此外调用epoll_wait获取就绪事件时,只会拷贝就绪的事件,不会进行不必要的拷贝擦欧总
- 事件的回调机制:避免操作系统主动进行轮询检测事件就绪,而是采用回调函数机制,将文件描述符放入到就绪队列中。调用epoll_wait时候直接访问就绪队列就知道哪些文件描述符已经就绪,检测是否有文件描述符的时间复杂度是O(1),只要判断就绪队列是否为空即可
- 没有数量限制,如果内存允许,就可以一直向红黑树中增加结点