所以,我们调用 select 会把所有要管理的 socket 的 fd (文件描述符,Linux下皆为文件,简单理解就是通过 fd 能找到这个 socket)传到内核中。
此时,要遍历所有 socket,看看是否有感兴趣的事件发生。如果没有一个 socket 有事件发生,那么 select 的线程就需要让出 cpu 阻塞等待,这个等待可以是不设置超时时间的死等,也可以是设置 timeout 的有超时时间的等待。
假设此时客户端发送了数据,网卡接收到的数据塞到对应的 socket 的接收队列中,此时 socket 知道来数据了,那如何唤醒 select 呢?
其实每个 socket 有个属于自己的睡眠队列,select 会安排一个内应,即在被管理的 socket 的睡眠队列里面塞入一个 entry。
当 socket 接收到网卡的数据后,就会去它的睡眠队列里遍历 entry,调用 entry 设置的 callback 方法,这个 callback 方法里就能唤醒 select !
所以 select 在每个被它管理的 socket 的睡眠队列里都塞入一个与它相关的 entry,这样不论哪个 socket 来数据了,它立马就能被唤醒然后干活!
但是,select 的实现不太好,因为唤醒的 select 此时只知道来活了,并不知道具体是哪个 socket 来数据了,所以只能傻傻地遍历所有 socket ,看看到底是哪个 scoket 来活了,然后把所有来活的 socket 封装成事件返回。
这样用户程序就能获得发生的事件,然后进行 I/O 和业务处理了。
这就是 select 的实现逻辑,理解起来应该不难。
这里再提一嘴 select 的限制,因为被管理的 socket fd 需要从用户空间拷贝到内核空间,为了控制拷贝的大小而做了限制,即每个 select 能拷贝的 fds 集合大小只有1024。
然后要改的话只能修改宏..再重新编译内核。网上很多文章都是这样说的,但是(没错有个但是)。
poll
poll 这玩意相比于 select 主要就是优化了 fds 的结构,不再是 bit 数组了,而是一个叫 pollfd 的玩意,反正就是不用管啥 1024 的限制了。
epoll(重点)
相信看了 select 的实现,我们稍微思考下,就能想出几个可以优化的点。
比如,为什么每次 select 需要把监控的 fds 传输到内核里?不能在内核里维护个?
为什么 socket 只唤醒 select,不能告诉它是哪个 socket 来数据了?
epoll 主要就是基于上面两点做了优化。
首先,搞了个叫 epoll_ctl 的方法,这方法就是用来管理维护 epoll 所监控的哪些 socket。
如果你的 epoll 要新加一个 socket 来管理,那就调用 epoll_ctl,要删除一个 socket 也调用 epoll_ctl,通过不同的入参来控制增删改。
epoll_ctl(epfd,EPOLL_CTL_ADD,clientfd,&ev);//添加
epoll_ctl(epfd,EPOLL_CTL_DEL,clientfd,&ev);//删除
这样,在内核里面就维护了此 epoll 管理的 socket 集合,这样就不用每次调用的时候都得把所有管理的 fds 拷贝到内核了。
对了,这个 socket 集合是用红黑树实现的。
然后和 select 类似,每个 socket 的睡眠队列里都会加个 entry,当每个 socket 来数据之后,同样也会调用 entry 对应的 callback。
与 select 不同的是,引入了一个 ready_list 双向链表,callback 里面会把当前的 socket 加入到 ready_list 然后唤醒 epoll。
这样被唤醒的 epoll 只需要遍历 ready_list 即可,这个链表里一定是有数据可读的 socket,相比于 select 就不会做无用的遍历了。
同时收集到的可读的 fd 按理是要拷贝到用户空间的,这里又做了个优化,利用了 mmp,让用户空间和内核空间映射到同一块内存中,这样就避免了拷贝。
- poll_create是在内核区创建一个epoll相关的一些列结构,并且将一个句柄fd返回给用户态,后续的操作都是基于此fd的,参数size是告诉内核这个结构的元素的大小,类似于stl的vector动态数组,如果size不合适会涉及复制扩容,不过貌似4.1.2内核之后size已经没有太大用途了;
- epoll_ctl是将fd添加/删除于epoll_create返回的epfd中,其中epoll_event是用户态和内核态交互的结构,定义了用户态关心的事件类型和触发时数据的载体epoll_data;
- epoll_wait*是阻塞等待内核返回的可读写事件,epfd还是epoll_create的返回值,events是个结构体数组指针存储epoll_event,也就是将内核返回的待处理epoll_event结构都存储下来,maxevents告诉内核本次返回的最大fd数量,这个和events指向的数组是相关的;
- epoll_wait是用户态需监控fd的代言人,后续用户程序对fd的操作都是基于此结构的;
ET<的区别:
ET:边沿触发。
按照上面的逻辑就是 epoll 遍历 ready_list 的时候,会把 socket 从 ready_list 里面移除,然后读取这个 scoket 的事件。
而 LT,水平触发,有点不一样。
在这个模式下 epoll 遍历 ready_list 的时候,会把 socket 从 ready_list 里面移除,然后读取这个 scoket 的事件,如果这个 socket 返回了感兴趣的事件,那么当前这个 socket 会再被加入到 ready_list 中,这样下次调用 epoll_wait 的时候,还能拿到这个 socket。
举个栗子:
如果此时一个客户端同时发来了 5 个数据包,按正常的逻辑,只需要唤醒一次 epoll ,把当前 socket 加一次到 ready_list 就行了,不需要加 5 次。然后用户程序可以把 socket 接收队列的所有数据包都读完。
但假设用户程序就读了一个包,然后处理报错了,后面不读了,那后面的 4 个包咋办?
如果是 ET 模式,就读不了了,因为没有把 socket 加入到 ready_list 的触发条件了。除非这个客户端发了新的数据包过来,这样才会再把当前 socket 加入到 ready_list,在新包过来之前,这 4 个数据包都不会被读到。而 LT 模式不一样,因为每次读完有感兴趣的事件发生之后,会把当前 socket 再加入到 ready_list,所以下次肯定能读到这个 socket,所以后面的 4 个数据包会被访问到,不论客户端是否发送新包。
#include<stdio.h>
#include<string.h>
#include<sys/socket.h>
#include<unistd.h>
#include<fcntl.h>
#include<netinet/in.h>
#include<errno.h>
#include<pthread.h>
#include<sys/poll.h>
#include<sys/epoll.h>
#define BUFFER_LENGTH 1024
#define POLL_SIZE 1024
#define EVENTS_SIZE 1024
//线程
void *client_thread(void*arg){
int clientfd=*(int*)arg;
while(1){
char buffer[BUFFER_LENGTH]={0};
int buffer_size=recv(clientfd,buffer,BUFFER_LENGTH,0);
printf("size: %d buffer:%s\n",buffer_size,buffer);
send(clientfd,buffer,buffer_size,0);
}
}
int main(){
int sockfd=socket(AF_INET,SOCK_STREAM,0);//io
//服务端
struct sockaddr_in servaddr;
servaddr.sin_family=AF_INET;//ipv4
servaddr.sin_port=htons(9999);//端口 short
servaddr.sin_addr.s_addr=htonl(INADDR_ANY);//0.0.0.0 long
//bind(sockfd,sockaddr,sizeof(struct sockaddr)) 返回-1 bind失败
//(struct sockaddr*)&servaddr 强转
if(-1==bind(sockfd,(struct sockaddr*)&servaddr,sizeof(struct sockaddr))){
printf("bind failed:%s\n",strerror(errno));
return -1;
}
//监听
listen(sockfd,10);
printf("listen\n");
//睡眠5秒
//sleep(5);
//客户端
struct sockaddr_in clientaddr;
socklen_t len=sizeof(clientaddr);
#if 0
while(1){
//accept阻塞
int clientfd=accept(sockfd,(struct sockaddr*)&clientaddr,&len);
//printf("accept\n");
char buffer[BUFFER_LENGTH]={0};
int buffer_size=recv(clientfd,buffer,BUFFER_LENGTH,0);
printf("size: %d buffer:%s\n",buffer_size,buffer);
send(clientfd,buffer,buffer_size,0);
#endif
//使用线程的方式
#if 0
pthread_t threadid;
pthread_create(&threadid,NULL,client_thread,&clientfd);
}
#endif
//IO多路复用-select
#if 0 // select, int fd
//select(maxfd, rfds, wfds, efds, timeout);
fd_set rfds, rset;
FD_ZERO(&rfds);
FD_SET(sockfd, &rfds);
int maxfd = sockfd;
int clientfd = 0;
while (1) { // master
rset = rfds;
int nready = select(maxfd+1, &rset, NULL, NULL, NULL);
if (FD_ISSET(sockfd, &rset)) { //
clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
printf("accept: %d\n", clientfd);
FD_SET(clientfd, &rfds);
if (clientfd > maxfd) maxfd = clientfd;
if (-- nready == 0) continue;
}
int i = 0;
for (i = sockfd+1; i <= maxfd;i ++) {
if (FD_ISSET(i, &rset)) {
char buffer[BUFFER_LENGTH] = {0};
int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);
if (ret == 0) {
close(clientfd);
break;
}
printf("ret: %d, buffer: %s\n", ret, buffer);
send(clientfd, buffer, ret, 0);
}
}
}
#endif
//IO多路复用-poll
#if 0
struct pollfd fds[POLL_SIZE] = {0};
fds[sockfd].fd = sockfd;
fds[sockfd].events = POLLIN;
int clientfd=0;
int maxfd=sockfd;
while(1){
int nready=poll(fds,maxfd+1,-1);
printf("hello\n");
if(fds[sockfd].revents & POLLIN){
clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
printf("accept: %d\n", clientfd);
fds[clientfd].fd=clientfd;
fds[clientfd].events=POLLIN;
if (clientfd > maxfd) maxfd = clientfd;
if (-- nready == 0) continue;
}
int i=0;
for(;i<=maxfd;i++){
if(fds[i].revents & POLLIN){
printf("i : %d\n",i);
char buffer[BUFFER_LENGTH] = {0};
int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);
if (ret == 0) {
fds[clientfd].fd = -1;
fds[clientfd].events = 0;
close(clientfd);
break;
}
printf("ret: %d, buffer: %s\n",ret, buffer);
send(clientfd, buffer, ret, 0);
}
}
}
#endif
//IO多路复用-epoll
#if 1
//epoll
int epfd=epoll_create(1);//参数为任意大于0的值,在一开始的版本中由于采用的为数组
//所以参数为数组的大小,在后来的版本迭代中改用链表的数据结构
//所以参数不再起作用,任意大于0的值即可
struct epoll_event ev;
ev.events=EPOLLIN;
ev.data.fd=sockfd;
epoll_ctl(epfd,EPOLL_CTL_ADD,sockfd,&ev);
struct epoll_event events[EVENTS_SIZE]={0};
while(1){
int nready=epoll_wait(epfd,events,EVENTS_SIZE,-1);//-1:一直等待,0:不等待,大于0:等待对于的单位时常
if(nready<0)continue;
int i=0;
for(i=0;i<nready;i++){
int connfd=events[i].data.fd;
if(connfd==sockfd){
int clientfd=accept(sockfd,(struct sockaddr*)&clientaddr,&len);
if(clientfd<=0){
continue;
}
printf("clientfd: %d\n",clientfd);
ev.data.fd=clientfd;
ev.events=EPOLLIN;
epoll_ctl(epfd,EPOLL_CTL_ADD,clientfd,&ev);//添加
}else if(events[i].events&EPOLLIN){
char buffer[BUFFER_LENGTH]={0};
int n=recv(connfd,buffer,BUFFER_LENGTH,0);
if(n>0){
printf("recv: %s\n",buffer);
send(connfd, buffer, n, 0);
}
else if(n==0){
printf("close\n");
epoll_ctl(epfd,EPOLL_CTL_DEL,connfd,NULL);//移除
close(connfd);
}
}
}
}
#endif
}