本篇主要内容:
- 一、Linux的文件描述符
- 二、多路IO转接(上)
- 1.基础版多路IO转接select
- ▶ 关于select( )函数
- ▶ select( )改写上篇案例
- 2.加强版多路IO转接poll
- 3.顶配版多路IO转接epoll
- ▶ epoll相关函数
- (1)创建监听红黑树
- (2)操作监听红黑树
- (3)等待监听
- ▶ epoll改写上篇案例
- 三、多路IO转接(下)
- 1.epoll的两种触发模式
- 2.epoll反应堆设计思想
一、Linux的文件描述符
- 文件描述符 0:标准输入,代表进程从终端或管道中读取数据;
- 文件描述符 1:标准输出,代表进程向终端或管道输出数据;
- 文件描述符 2:标准错误输出,代表进程向终端或管道输出错误信息;
- 文件描述符 3:代表进程打开的第一个文件,可以通过dup()函数复制到其他文件描述符上使用;
- 文件描述符 4:代表进程打开的第二个文件,同样可以通过dup()函数复制到其他文件描述符上使用;
这些特殊的文件描述符可以让进程进行更加灵活的 I/O 操作,例如将标准错误输出重定向到文件中,或者在不关闭已打开的文件的情况下执行其他操作等。内核自动从0开始(一般最大默认1024,可以通过命令ulimit -n查看),为进程所打开的文件分配文件描述符
二、多路IO转接(上)
1.基础版多路IO转接select
▶ 关于select( )函数
select( ) 是一种多路 I/O 转接技术,它可以同时监控多个 I/O 状态,能够实现高效的事件驱动(event-driven)程序设计。它灵活性高、跨平台性好:select() 函数是POSIX标准中规定的函数,支持Linux、Unix、MacOS、类Unix等平台。poll()和epoll()两种多路IO转接都只适用于Linux
,但是由于每次调用select( )都需要将所有文件描述符在用户态和内核态之间复制拷贝,且select依赖轮询机制监听事件,因此当监听的文件描述符数量过大时效率较低,此外select( )只支持最大文件描述符1024个,可扩展性较差。
▶ select( )改写上篇案例
PS:服务器绑定监听等代码封装在了头文件中,这里主要体现select()的使用
#include "net.h"
int main() {
struct ServerSocket ss = {
.serverListen = serverListen,
.socketBind = socketBind,
.serverAccept = serverAccept
};
ss.sockfd = socket(AF_INET, SOCK_STREAM, 0);
ss.socketBind(ss.sockfd, "192.168.35.128", 8880);
ss.serverListen(ss.sockfd, 128);
/* socket创建、bind绑定等其它已经封装到了net.h头文件
* 这里主要体现多路IO转接select()的使用 */
int maxfd = ss.sockfd; //最大文件描述符
int retn = 0;
fd_set readSet, tempSet; //读事件集合,临时的集合(临时备份readSet)
FD_ZERO(&tempSet); //清0
FD_SET(ss.sockfd, &tempSet); //把server用于监听的fd加入tempSet
int nread, i;
char *readBuff = (char *) malloc(128); //读buff
FILE *buffFile = NULL; //文件流
struct Data data;
char head[8];
struct ClientSocket cs;
while (1) {
readSet = tempSet;
retn = select(maxfd + 1, &readSet, NULL, NULL, NULL);
if (retn == -1) {
perror("select");
exit(-2);
}
if (FD_ISSET(ss.sockfd, &readSet)) { //判断ss.sockfd是否在readSet里(满足读事件则客户端接入)
/* 调用accept(),并更新maxfd,tempSet后进入下一次循环*/
cs = ss.serverAccept(ss.sockfd);
FD_SET(cs.cfd, &tempSet);
printf("<server> client connected (%s:%d)\n", cs.ip, cs.port);
if (maxfd < cs.cfd)maxfd = cs.cfd;
continue; //这里可if(retn == 1)continue;
}
/* 循环遍历符合读事件的fd */
for (i = ss.sockfd + 1; i <= maxfd; i++) {
if (FD_ISSET(i, &readSet)) {
/* 找到满足的读事件fd */
nread = read(i, readBuff, 128); //读取客户端发过来的命令
/* 对read判空,防止客户端退出后一直收空数据的死循环 */
if (nread == 0) {
printf("<server> client disconnected (%s:%d)\n", cs.ip, cs.port);
FD_CLR(i, &tempSet);
close(i);
continue;
}
/* 执行客户端发过来的命令 */
buffFile = popen(readBuff, "r"); //命令执行成功结果读取到writeBuff
data = dataDealWith(buffFile);
sprintf(head, "%ld", data.dataLenth);
write(i, head, 8);
write(i, data.dataBody, data.dataLenth);
memset(readBuff, '\0', 128);
memset(&data, 0, sizeof(data));
pclose(buffFile);
}
}
}
}
2.加强版多路IO转接poll
- poll只在select的基础上有一定的优化,但并未解决其他缺点带来的并发性限制,也不适用于高并发场景
3.顶配版多路IO转接epoll
▶ epoll相关函数
- epoll是Linux下最常用、最高效的IO多路复用函数,它可以通过内核事件驱动机制实现对大量套接字(上万个)进行监听,底层利用红黑树和链表来存储并快速查找需要监视的文件描述符以及对应的事件,避免了传统系统调用(如 select、poll)中使用轮询方式进行遍历的缺点,提高了效率。(redis、nginx、java NIO底层均使用epoll)
- epoll相对于select( )和poll( )具有的优势:
- 支持较大的文件描述符和连接数
(上限是整个系统打开文件的最大值)
。 - 支持更高效的事件通知处理机制。
(时间复杂度为O(1))
- 支持ET模式和LT模式两种工作模式。
目前被广泛地使用于各种高并发服务器程序中,redis、nginx等
(1)创建监听红黑树
(2)操作监听红黑树
(3)等待监听
▶ epoll改写上篇案例
#include "net.h"
#include <sys/epoll.h>
int main() {
struct ServerSocket ss = {
.serverListen = serverListen,
.socketBind = socketBind,
.serverAccept = serverAccept
};
ss.sockfd = socket(AF_INET, SOCK_STREAM, 0);
ss.socketBind(ss.sockfd, "192.168.35.128", 8880);
ss.serverListen(ss.sockfd, 128);
/* socket创建、bind绑定等其它已经封装到了net.h头文件
* 这里主要体现多路IO转接epoll的使用 */
struct ClientSocket cs;
int i, retn;
int epoll_fd = epoll_create(128);
if (epoll_fd < 0) {
perror("epool_create");
exit(-1);
}
struct epoll_event events[16];
char *readBuff = (char *) malloc(128); //读buff
FILE *buffFile = NULL; //文件流
struct Data data;
char head[8];
int nread = 0;
/* 创建监听读事件红黑树,并将ss.sockfd添加进去 */
struct epoll_event event = {
.data.fd = ss.sockfd,
.events = EPOLLIN
};
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, ss.sockfd, &event) < 0) {
perror("epoll_ctl");
exit(-1);
}
/* 等待读事件发生,判断是客户端接入还是已接入客户端发生数据 */
while (1) {
retn = epoll_wait(epoll_fd, events, 16, -1);
if (retn < 0) {
perror("epoll_wait");
exit(-1);
} else {
/* 轮询 */
for (i = 0; i < retn; i++) {
if (events[i].data.fd == ss.sockfd) { //客户端接入
cs = ss.serverAccept(ss.sockfd);
event.data.fd = cs.cfd;
event.events = EPOLLIN;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, cs.cfd, &event);
printf("<server> client connected (%s:%d)\n", cs.ip, cs.port);
continue;
}else{ //已接入读客户端写数据
nread = read(events[i].data.fd, readBuff, 128); //读取客户端发过来的命令
/* 对read判空,防止客户端退出后一直收空数据的死循环 */
if (nread == 0) {
printf("<server> client disconnected (%s:%d)\n", cs.ip, cs.port);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, &event);
close(events[i].data.fd);
}
/* 执行客户端发过来的命令 */
buffFile = popen(readBuff, "r"); //命令执行成功结果读取到writeBuff
data = dataDealWith(buffFile);
sprintf(head, "%ld", data.dataLenth);
write(events[i].data.fd, head, 8);
write(events[i].data.fd, data.dataBody, data.dataLenth);
memset(readBuff, '\0', strlen(readBuff));
memset(&data, 0, sizeof(data));
pclose(buffFile);
}
}
}
}
}
三、多路IO转接(下)
前面只是通过改写案例熟悉epoll的几个相关函数,并初步认识epoll的使用,实际上,epoll相比于select和poll效率更高,可以显著提高服务器的性能,降低系统开销。但是,epoll在应用中使用起来比较复杂,需要合理地设计数据结构和回调机制,才能发挥其最大的优势
1.epoll的两种触发模式
2.epoll反应堆设计思想
epoll是一种高效的I/O事件通知机制,常用于网络编程中。其设计思想是基于内核态与用户态的交互方式,使用了红黑树(Red-Black Tree)数据结构来维护被监听的描述符集合,进而避免了遍历文件描述符导致的性能问题。当有 I/O 事件发生时,epoll就会通知应用程序进行处理,从而实现了高效地I/O处理。同时,epoll还支持边缘触发模式和水平触发模式,可以根据不同需求进行选择。更多详细,可见Bilibili黑马C++课程
核心:epoll ET模式 + 非阻塞IO + 回调函数
【main.c】
#include "net.h"
/* 文件描述符事件(fd对应事件对应的回调) */
struct Fd_Event {
int fd; //lfd或cfd
int event; //监听事件
void *arg; //回调函数void *参数
int (*callBack)(ServerSocket ss, int fd, int event, void *arg, void *retnEvents); /* 回调函数 */
int epoll_fd; //红黑树句柄
int status; //节点状态(0-不在监听、1-在监听)
};
/* 设置文件描述符为非阻塞并返回 */
int setNonblock(int fd) {
int flag = fcntl(fd, F_GETFL, 0); //获取当前flag
if (flag < 0) {
perror("fcntl");
exit(-1);
}
flag = flag | O_NONBLOCK; //获取非阻塞flag
fcntl(fd, F_SETFL, flag); //设置非阻塞
return fd;
}
/* 初始化服务器socket(创建socket、端口复用、绑定、监听) */
int initServerSocket(ServerSocket ss, char *ip, int port) {
printf("<Server> init...\n");
ss.sockfd = socket(AF_INET, SOCK_STREAM, 0);
int optval = 1;
setsockopt(ss.sockfd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
ss.socketBind(ss.sockfd, ip, port);
ss.serverListen(ss.sockfd, 128);
return ss.sockfd;
}
/***************************************************************
函数作用:初始化lfd,设置非阻塞、初始化Fd_event结构体、上树
函数参数:int lfd 监听文件描述符
int epoll_fd 监听红黑树fd
struct Fd_Event *lfd_event 自定义的结构体类型(传出参数)
int (*callBack)() 函数指针(传回调函数)
***************************************************************/
int initListenFd(int lfd, int epoll_fd, struct Fd_Event *lfd_event,
int (*callBack)(ServerSocket, int, int, void *, void *)) {
/* 设置lfd非阻塞 */
lfd = setNonblock(lfd);
/* 初始化lfd的Fd_event结构体 */
lfd_event->epoll_fd = epoll_fd;
lfd_event->fd = lfd;
lfd_event->event = EPOLLIN | EPOLLET;
lfd_event->callBack = callBack;
lfd_event->status = 0;
/* 添加到监听红黑树 */
struct epoll_event ev = {
.data.ptr = lfd_event, //data.ptr(联合体成员void *)指向Fd_event
.events = lfd_event->event
};
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, lfd, &ev) < 0) {
perror("epoll_ctl");
return -1;
}
lfd_event->status = 1;
return lfd;
}
/***************************************************************
函数作用:初始化lfd(设置非阻塞、初始化Fd_event结构体、上树)
函数参数:int lfd 监听文件描述符
int epoll_fd 监听红黑树fd
struct Fd_Event *cfd_event 自定义的结构体类型(传出参数)
int (*callBack)() 函数指针(传回调函数)
PS:同上initListenFd()
***************************************************************/
int initConnectFd(int cfd, int epoll_fd, struct Fd_Event *cfd_event, int event,
int (*callBack)(ServerSocket, int, int, void *, void *)) {
cfd = setNonblock(cfd);
cfd_event->epoll_fd = epoll_fd;
cfd_event->fd = cfd;
cfd_event->event = event;
cfd_event->callBack = callBack;
cfd_event->status = 0;
struct epoll_event ev = {
.data.ptr = cfd_event,
.events = cfd_event->event
};
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, cfd, &ev) < 0) {
perror("epoll_ctl");
return -1;
}
cfd_event->status = 1;
return cfd;
}
/* cfd的回调函数实现(这里主要做收发数据)*/
int cfdEventHandler(ServerSocket ss, int fd, int event, void *arg, void *retnEvent) {
char *readBuff = (char *) malloc(128);
char *head = (char *) malloc(8);
FILE *buffFile = NULL;
struct Fd_Event *temp = (struct Fd_Event *)arg;
int nread = read(fd, readBuff, 128); //读取客户端发过来的命令
/* 对read判空,防止客户端退出后一直收空数据的死循环 */
if (nread == 0) {
printf("<server> client disconnected \n");
epoll_ctl(temp->epoll_fd,EPOLL_CTL_DEL,fd,NULL);
close(fd);
}
/* 执行客户端发过来的命令 */
buffFile = popen(readBuff, "r"); //命令执行成功结果读取到writeBuff
struct Data data = dataDealWith(buffFile);
sprintf(head, "%ld", data.dataLenth);
write(fd, head, 8); //先发8个字节的头部
write(fd, data.dataBody, data.dataLenth);
pclose(buffFile);
free(readBuff);
free(head);
return 1;
}
/* lfd回调函数(建立连接获得cfd,再初始化cfd) */
/* retnEvent保存cfd的Fd_Event结构体数组。 */
int lfdEventHandler(ServerSocket ss, int fd, int event, void *arg, void *retnEvent) {
int i;
struct Fd_Event *lfd_Event = (struct Fd_Event *) arg;
ClientSocket cs = ss.serverAccept(ss.sockfd);
printf("<Server> client connected(%s:%d)\n", cs.ip, cs.port);
struct Fd_Event *ptemp = (struct Fd_Event *)retnEvent;
for(i=0;i<1024;i++){
if(ptemp[i].status == 0)break;
}
//printf("epollfd=%d\n",lfd_Event->epoll_fd);
initConnectFd(cs.cfd, lfd_Event->epoll_fd,&ptemp[i],event,cfdEventHandler);
return cs.cfd;
}
int main() {
int lfd;
int nready, i;
ServerSocket ss = {
.serverListen = serverListen,
.socketBind = socketBind,
.serverAccept = serverAccept
};
lfd = initServerSocket(ss, "192.168.35.128", 8880);
ss.sockfd = lfd;
int epoll_fd = epoll_create(128);
if (epoll_fd < 0) {
perror("epoll_create");
exit(-1);
}
struct Fd_Event lfd_event;
struct Fd_Event cfd_retnEvent[1024]; //传出参数:存就绪的cfd
for(int j=0;j<1024;j++){
cfd_retnEvent[j].status = 0;
}
struct epoll_event events[1024];
initListenFd(lfd, epoll_fd, &lfd_event, lfdEventHandler);
while (1) {
nready = epoll_wait(epoll_fd, events, 1024, 1000);
if (nready < 0) {
perror("epoll_wait");
exit(-1);
}
for (i = 0; i < nready; i++) {
struct Fd_Event *ptemp = (struct Fd_Event *) events[i].data.ptr;
//printf("ptemp->fd = %d\n", ptemp->fd);
if (ptemp->event & EPOLLIN) {
if (ptemp->fd == lfd){
ptemp->callBack(ss, lfd, EPOLLIN | EPOLLET, (void *) (&lfd_event), (void *)cfd_retnEvent);
}else{ //cfd读事件
ptemp->callBack(ss, ptemp->fd, EPOLLIN | EPOLLET, (void *) cfd_retnEvent, NULL);
}
} else if (ptemp->event & EPOLLOUT) { //cfd写事件
ptemp->callBack(ss, ptemp->fd, EPOLLOUT | EPOLLET, (void *) cfd_retnEvent, NULL);
}
}
}
}
/*******************************************************************
* 实际上,程序还存在一些小BUG,这里仅简单使用epoll反应堆的代码设计思想 *
* 关于更多epoll反应堆或回调的巧妙编程设计思想可详细阅读开源的libevent库 *
********************************************************************/
【net.h】
#ifndef _NET_H_
#define _NET_H_
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
#include <sys/shm.h>
#include <signal.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/wait.h>
/* 客户端socket结构体 */
typedef struct {
int cfd; //建立连接的socket文件描述符
char ip[32]; //客户端IP
int port; //客户端Port
}ClientSocket ;
/* 服务器socket结构体 */
typedef struct{
int sockfd; //服务器socket文件描述符
void (*socketBind)(int, char *, int); //给sockfd绑定地址函数
void (*serverListen)(int, int); //监听sockfd函数
ClientSocket (*serverAccept)(int); //建立连接函数
} ServerSocket ;
/* 服务器socket绑定地址信息函数实现 */
void socketBind(int sockfd, char *ip, int port) {
int retn;
/* 初始化地址结构体sockaddr_in */
struct sockaddr_in serAddr = {
.sin_family = AF_INET,
.sin_port = htons(port)
};
inet_pton(AF_INET, ip, &serAddr.sin_addr.s_addr);
/* 调用bind()绑定地址 */
retn = bind(sockfd, (struct sockaddr *) &serAddr, sizeof(serAddr));
if (retn == -1) {
perror("bind");
exit(-1);
}
printf("<Server> bind address.(%s:%d)\n", ip, port);
}
/* 服务器socket监听函数实现 */
void serverListen(int sockfd, int n) {
int retn;
retn = listen(sockfd, n);
if (retn == -1) {
perror("listen");
exit(-1);
}
printf("<Server> listening...\n");
}
/* 服务器建立连接函数实现,返回值为struct ClientSocket结构体 *
* (包括建立连接的socket文件描述符、客户端信息) */
ClientSocket serverAccept(int sockfd) {
struct sockaddr_in clientAddr;
socklen_t addrLen = sizeof(clientAddr);
ClientSocket c_socket;
c_socket.cfd = accept(sockfd, (struct sockaddr *) &clientAddr, &addrLen);
if (c_socket.cfd == -1) {
perror("accept");
exit(-1);
} else {
c_socket.port = ntohs(clientAddr.sin_port);
inet_ntop(AF_INET, &clientAddr.sin_addr.s_addr, c_socket.ip, sizeof(clientAddr));
return c_socket;
}
}
/* 数据结构体 */
struct Data {
int headerLenth; //数据头部长度
long dataLenth; //数据长度(命令执行成功的结果长度)
char *dataBody; //数据正文(命令执行成功的结果)
};
/* 处理数据的函数,返回值为struct Data */
struct Data dataDealWith(FILE *file){
char *tempBuff = (char *)malloc(8192); //临时buff
long readBytes = 0; //读取的字节数
struct Data data = {
.dataLenth = 0,
.dataBody = NULL
};
/* 处理数据:计算数据正文大小,并保留管道中的数据到data.dataBody(需要动态调整大小) */
while(fread(tempBuff,sizeof(char),8192,file) > 0){
readBytes = strlen(tempBuff)+1; //读到临时buff的字节数
data.dataLenth += readBytes; //数据长度累加readBytes
if(data.dataLenth <= readBytes){ //如果数据长度小于设置的tempBuff大小,直接拷贝
data.dataBody = (char *)malloc(readBytes);
strcpy(data.dataBody,tempBuff);
}else if(data.dataLenth > readBytes){ //如果数据长度大于设置的tempBuff大小,扩容后拼接到后面
data.dataBody = realloc(data.dataBody,data.dataLenth);
strcat(data.dataBody,tempBuff);
}
data.dataBody[strlen(data.dataBody)+1] = '\0';
memset(tempBuff,'\0',8192);
}
free(tempBuff); //释放临时buff
return data;
}
#endif