一、I/O多路复用简介
socket通信,在Linux系统其是就是文件描述符,对应于内核中的缓冲区(包含读缓冲区与写缓冲区),实质上是对读写缓冲区的操作;多路复用,多条路复用成一条路。
I/O多路复用使得程序能同时监听多个文件描述符,Linux下实现I/O多路复用的系统调用主要有select、poll、epoll。
例如,有十个客户端连接,我们需要知道那个客户端发送了数据,那个客户端没有发送数据,应该怎么操作?遍历文件描述符的读缓冲区。但是不能同时知道。使用I/O复用可以同时监控多个文件描述符。
几种常见的IO模型;操作IO实质就是看文件描述符中的读写缓冲区是否有数据。
利用阻塞等待,来判断对应的读写缓冲区是否有数据,其优点是,不需要占用CPU时间,缺点是,同一时刻只能处理一个操作,效率较低。
进一步的,添加进程或者线程,可以同时处理多个操作。其优点就是可以连接多个客户端,实现并发操作。缺点是,线程或者进程会消耗资源,消耗CPU资源(进程调度)。
代码如何写?BIO模型;
1、首先创建一个socket,然后链接,监听,阻塞在accept,当有一个客户端连接的时候,进行通信,阻塞在read或者recv中,当有数据操作时,才继续执行,但是如果阻塞在accept的时候,另外在有客户端进来,就不能再accept。
2、改进采用多线程操作, 在线程中阻塞read或者recv。可以同时使得其他客户端进行连接。
根本问题就是阻塞;
非阻塞,忙轮循
不存在阻塞技术,因为采用不停的轮循,去判断是否有客户端的连接,是否有数据的收发,不需要阻塞在那一块,但是需要不停的轮循遍历,浪费系统资源。
优点:提高程序的执行效率;
缺点:需要占用更多的CPU和资源管理;
IO多路转接技术
内核检测数据,可以知道有没有数据到,有没有客户端进行连接,但是具体是哪一个还需要轮循遍历。 但是这个遍历不再是遍历read或者recv函数,而是另外的01效率较比之前是有所提升的。
内核检测,不仅能检测到是否有数据到,还能告诉你是那几个数据到了。在程序中只需呀检测一次,实现多路复用为一路的。
二、select 技术
主旨思想:
1、构造一个关于文件描述符的列表,将要监听的文件描述符添加到该列表中。
2、调用一个系统API——select,监听该列表中的文件描述,直到这些文件描述符中的一个或者过个进行了IO操作,该函数才返回。
此函数是阻塞的;函数对文件描述符的检测的操作是由内核完成的。
3、返回时,会告诉进程有多少描述符要进行IO操作。
其余相关API:
// 将参数文件描述符fd对应的标志位设置为0
void FD_CLR(int fd, fd_set *set);
// 判断fd对应的标志位是0还是1, 返回值 : fd对应的标志位的值,0,返回0, 1,返回1
int FD_ISSET(int fd, fd_set *set);
// 将参数文件描述符fd 对应的标志位,设置为1
void FD_SET(int fd, fd_set *set);
// fd_set一共有1024 bit, 全部初始化为0
void FD_ZERO(fd_set *set);
创建一个文件描述符列表,其大小为1024,代表1024个bit位,对应一个文件描述符。检测四个文件描述符,将对应的文件描述符对应标志位设置为1,调用select函数。nfds+1的作用是让内核遍历的位置。将创建的文件描述符,从用户态拷贝到内核态,由内核进行判断,判断完成后再由内核态拷贝到用户态,进而输出检测结果。
select.c
#include <stdio.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/select.h>
int main()
{
//创建socket
int lfd=socket(PF_INET,SOCK_STREAM,0);
struct sockaddr_in saddr;
saddr.sin_port=htons(9999);
saddr.sin_family=AF_INET;
saddr.sin_addr.s_addr=INADDR_ANY;
//绑定
bind(lfd,(struct sockaddr*)&saddr,sizeof(saddr));
//监听
listen(lfd,8);
//创建一个fd_set集合,存放的是需要检测的文件描述符
fd_set rdset,tmp;
//初始化
FD_ZERO(&rdset);
//添加要检测的文件描述符
FD_SET(lfd,&rdset);
int maxfd=lfd;
//在循环中监听——调用select函数,让内核帮忙进行检测;
while(1)
{
tmp=rdset;
int ret = select(maxfd+1,&tmp,NULL,NULL,NULL);
if(ret==-1)
{
perror("select");
exit(-1);
}
else if (ret==0)
{
continue;
}
else if (ret>0)
{
//说明文件描述符对应的缓冲区数据发生了改变;
if(FD_ISSET(lfd,&tmp))
{
//表示有新的客户端连接进来
struct sockaddr_in cliaddr;
int len=sizeof(cliaddr);
int cfd= accept(lfd,(struct sockaddr *)&cliaddr,&len);
printf("新的客户端添加:%d\n",cfd);
//将新的文件描述符添加到集合中去;
FD_SET(cfd,&rdset);
//更新最大的文件描述符
maxfd= maxfd>cfd ? maxfd:cfd;
}
for(int i=lfd+1;i<=maxfd;i++)
{
if(FD_ISSET(i,&tmp))
{
//说明这个文件描述符对应的客户端发来了数据;
char buf[1024]={0};
int len =read(i,buf,sizeof(buf));
if(len==-1)
{
perror("read");
exit(-1);
}
else if(len==0)
{
printf("client close...\n");
close(i);
FD_CLR(i,&rdset);
}
else if(len>0)
{
printf("read buf:%s\n",buf);
write(i,buf,strlen(buf+1));
}
}
}
}
}
close(lfd);
return 0;
}
client.c
#include <stdio.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
int main() {
// 创建socket
int fd = socket(PF_INET, SOCK_STREAM, 0);
if(fd == -1) {
perror("socket");
return -1;
}
struct sockaddr_in seraddr;
inet_pton(AF_INET, "127.0.0.1", &seraddr.sin_addr.s_addr);
seraddr.sin_family = AF_INET;
seraddr.sin_port = htons(9999);
// 连接服务器
int ret = connect(fd, (struct sockaddr *)&seraddr, sizeof(seraddr));
if(ret == -1){
perror("connect");
return -1;
}
int num = 0;
while(1) {
char sendBuf[1024] = {0};
sprintf(sendBuf, "send data %d", num++);
write(fd, sendBuf, strlen(sendBuf) + 1);
// 接收
int len = read(fd, sendBuf, sizeof(sendBuf));
if(len == -1) {
perror("read");
return -1;
}else if(len > 0) {
printf("read buf = %s\n", sendBuf);
} else {
printf("服务器已经断开连接...\n");
break;
}
sleep(1);
//usleep(1000);
}
close(fd);
return 0;
}
运行结果
服务器:
客户端1
客户端2
缺点:
1、每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在内核遍历传递进来的所有fd,这个开销在fd很多时也很大。
2、同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大。
3、select支持的文件描述符数量太小了,默认是1024。
4、fds集合不能重复使用,每次需要重置。
三、poll技术
改进select中两个缺点:
1、select支持的文件描述符数量太小,默认是1024;
2、fds集合不能重复使用,每次需要重置;
1024是因为fd_set就是这种类型,改用结构体数组,可以自定义结构体内部大小,再用结构体成员来扩充文件描述符的标志位,可以不需要重置。
对于events和revents的取值有以下的设置:
利用poll实现:
poll.c
#include <sys/poll.h>
#include <stdio.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/select.h>
int main()
{
//创建socket
int lfd=socket(PF_INET,SOCK_STREAM,0);
struct sockaddr_in saddr;
saddr.sin_port=htons(9999);
saddr.sin_family=AF_INET;
saddr.sin_addr.s_addr=INADDR_ANY;
//绑定
bind(lfd,(struct sockaddr*)&saddr,sizeof(saddr));
//监听
listen(lfd,8);
//初始化检测的文件描述符数组
struct pollfd fds[1024];
for(int i=0;i<1024;i++)
{
fds[i].fd=-1;
fds[i].events=POLLIN;
}
fds[0].fd=lfd;
int nfds=0;
while(1)
{
//调用poll系统函数,让内核帮忙检测哪些文件描述符有数据
int ret=poll(fds,nfds+1,-1);
if(ret==-1)
{
perror("poll");
exit(-1);
}
else if (ret==0)
{
continue;
}
else if (ret>0)
{
//说明文件描述符对应的缓冲区数据发生了改变;
if(fds[0].revents&POLLIN)
{
//表示有新的客户端连接进来
struct sockaddr_in cliaddr;
int len=sizeof(cliaddr);
int cfd= accept(lfd,(struct sockaddr *)&cliaddr,&len);
printf("新的客户端添加:%d\n",cfd);
//将新的文件描述符添加到集合中去;
for(int i=1;i<1024;i++)
{
if(fds[i].fd==-1)
{
fds[i].fd=cfd;
fds[i].events=POLLIN;
break;
}
}
//更新最大的文件描述符
nfds= nfds>cfd ? nfds:cfd;
}
for(int i=1;i<=nfds;i++)
{
if(fds[i].revents & POLLIN)
{
//说明这个文件描述符对应的客户端发来了数据;
char buf[1024]={0};
int len =read(fds[i].fd,buf,sizeof(buf));
if(len==-1)
{
perror("read");
exit(-1);
}
else if(len==0)
{
printf("client close...\n");
fds[i].fd=-1;
close(fds[i].fd);
}
else if(len>0)
{
printf("read buf:%s\n",buf);
write(i,buf,strlen(buf+1));
}
}
}
}
}
close(lfd);
return 0;
}
运行结果
服务器:
客户端1:
客户端2:
四、epoll技术
poll技术是对select技术的一个改进,解决了select技术中后两个缺点,但是还存在两个缺点:
1、每次调用都要进行一个拷贝;将要检测的文件描述符的信息进行内核的拷贝。
2、调用时仍然需要进行遍历。只能知道有多少返回值发生了改变,但是不知道是哪一个改变了,因此需要进行遍历。
epoll技术原理:
在内核区创建一个eventpoll数据,返回一个文件描述符,通过这个文件描述符可以操作这块区域的数据。两个重要的成员:rbr与rdlist,rbr采用红黑树的结构;rdlist采用双链表的结构。
效率提高:1、没有用户态到内核态的拷贝过程;2、之前是线性数组结构,现在是红黑树的结构,现在遍历的速度更快。
相关API函数说明:
涉及头文件:#include<sys/epoll>
1、 int epoll_creat(int size)
//创建一个新的epoll实例,在内核中创建了一个数据,这个数据中有两个主要的数据,一个是需要检测的文件描述符的信息(红黑树),一个是就绪列表,存放检测到数据发生改变的文件描述符信息(双向链表)。
参数:目前没有意义;随便一个数就行;
返回值:-1,失败。>0,文件描述符,用来操作epoll实例;
2、int epoll_ctl(int epfd,int op,int fd,struct epoll_event *event);
//对epoll实例进行管理,添加、删除、修改文件描述符信息;
参数:
epfd:epoll实例对应的文件描述符;
op:要进行什么操作;
EPOLL_CTL_ADD: 添加
EPOLL_CTL_MOD: 修改
EPOLL_CTL_DEL: 删除fd:要检测的文件描述符,
event:要检测文件描述符什么事情;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
}常见的Epoll检测事件:
- EPOLLIN
- EPOLLOUT
- EPOLLERRtypedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;3、int epoll_wait(int epfd,struct epoll_event *events,int maxevents,int timeout)
//检测函数
参数:epfd:epoll实例对应的文件描述符;
events:传出参数,保存了发送了变化的文件描述符信息;
maxevents:第二个参数结构体数组的大小;
timeout:阻塞时间;
0:不阻塞;-1:阻塞(直到检测到fd数据发生变化,解除阻塞);
>0:阻塞的时长(毫秒);
返回值:
成功返回发生变化的描述符的个数;失败返回-1;
epoll.c
#include <stdio.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
int main()
{
//创建socket
int lfd=socket(PF_INET,SOCK_STREAM,0);
struct sockaddr_in saddr;
saddr.sin_port=htons(9999);
saddr.sin_family=AF_INET;
saddr.sin_addr.s_addr=INADDR_ANY;
//绑定
bind(lfd,(struct sockaddr*)&saddr,sizeof(saddr));
//监听
listen(lfd,8);
//调用epoll_creat 创建一个epoll实例
int epfd= epoll_create(100);
//将监听的文件描述符相关的检测信息添加到epoll实例中;
struct epoll_event epev;
epev.events=EPOLLIN;
epev.data.fd=lfd;
epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&epev);
struct epoll_event epevs[1024];
while(1)
{
int ret=epoll_wait(epfd,epevs,1024,-1);
if(ret==-1)
{
perror("epoll_wait");
exit(-1);
}
printf("ret=%d\n",ret);
for(int i=0;i<ret;i++)
{
int curfd=epevs[i].data.fd;
if(curfd==lfd)
{
//有客户端连接
struct sockaddr_in cliaddr;
int len=sizeof(cliaddr);
int cfd= accept(lfd,(struct sockaddr *)&cliaddr,&len);
printf("新的客户端添加:%d\n",cfd);
epev.events=EPOLLIN;
epev.data.fd=cfd;
epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&epev);
}
else{
// if(epevs[i].events&EPOLLOUT)
// {
// continue;
// }
//有数据到达
char buf[1024]={0};
int len =read(curfd,buf,sizeof(buf));
if(len==-1)
{
perror("read");
exit(-1);
}
else if(len==0)
{
printf("client close...\n");
epoll_ctl(epfd,EPOLL_CTL_DEL,curfd,NULL);
close(curfd);
}
else if(len>0)
{
printf("read buf:%s\n",buf);
write(curfd,buf,strlen(buf+1));
}
}
}
}
close(lfd);
close(epfd);
return 0;
}
client.c
#include <stdio.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
int main() {
// 创建socket
int fd = socket(PF_INET, SOCK_STREAM, 0);
if(fd == -1) {
perror("socket");
return -1;
}
struct sockaddr_in seraddr;
inet_pton(AF_INET, "127.0.0.1", &seraddr.sin_addr.s_addr);
seraddr.sin_family = AF_INET;
seraddr.sin_port = htons(9999);
// 连接服务器
int ret = connect(fd, (struct sockaddr *)&seraddr, sizeof(seraddr));
if(ret == -1){
perror("connect");
return -1;
}
int num = 0;
while(1) {
char sendBuf[1024] = {0};
sprintf(sendBuf, "send data %d", num++);
write(fd, sendBuf, strlen(sendBuf) + 1);
// 接收
int len = read(fd, sendBuf, sizeof(sendBuf));
if(len == -1) {
perror("read");
return -1;
}else if(len > 0) {
printf("read buf = %s\n", sendBuf);
} else {
printf("服务器已经断开连接...\n");
break;
}
//sleep(1);
usleep(1000);
}
close(fd);
return 0;
}
运行结果:
epoll.c
客户端1
客户端2
epoll的工作模式:
LT模式(水平触发)
假设委托内核检测读事件->检测fd的读缓冲区;
读缓冲区有数据->epoll检测到了会给用户通知;
用户不读数据,数据一直在缓冲区,epoll会一直通知;
用户只读了一部分数据,epoll会一直通知;
缓冲区数据读完了,不通知。
LT(level-triggered)是缺省的工作方式(默认工作模式),并且同时支持block与no block socket。内核告诉你一个文件描述符是否就绪,然后你可以对这个就绪的fd进行IO操作。如果你不做任何操作,内核还是继续通知你。
ET模式(边沿触发)
假设委托内核检测读事件->检测fd的读缓冲区;
读缓冲区有数据->epoll检测到了会给用户通知;
用户不读数据,数据一直在缓冲区,epoll下次检测的时候就不通知;
用户只读了一部分数据,epoll不通知;
缓冲区数据读完了,不通知。
ET(edge-trigged)是高速工作方式,只支持no-clock socket,在这种工作模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你,然后他会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪态,但是请注意,如果一直不对这个fd进行IO操作(从而导致它再次变成未就绪态),内核不会发送更多的通知。
ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率比LT模式高,epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由一个文件句柄的阻塞读/写操作把处理多个文件描述符的任务饿死。
水平触发测试:
#include <stdio.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
int main()
{
//创建socket
int lfd=socket(PF_INET,SOCK_STREAM,0);
struct sockaddr_in saddr;
saddr.sin_port=htons(9999);
saddr.sin_family=AF_INET;
saddr.sin_addr.s_addr=INADDR_ANY;
//绑定
bind(lfd,(struct sockaddr*)&saddr,sizeof(saddr));
//监听
listen(lfd,8);
//调用epoll_creat 创建一个epoll实例
int epfd= epoll_create(100);
//将监听的文件描述符相关的检测信息添加到epoll实例中;
struct epoll_event epev;
epev.events=EPOLLIN;
epev.data.fd=lfd;
epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&epev);
struct epoll_event epevs[1024];
while(1)
{
int ret=epoll_wait(epfd,epevs,1024,-1);
if(ret==-1)
{
perror("epoll_wait");
exit(-1);
}
printf("ret=%d\n",ret);
for(int i=0;i<ret;i++)
{
int curfd=epevs[i].data.fd;
if(curfd==lfd)
{
//有客户端连接
struct sockaddr_in cliaddr;
int len=sizeof(cliaddr);
int cfd= accept(lfd,(struct sockaddr *)&cliaddr,&len);
printf("新的客户端添加:%d\n",cfd);
epev.events=EPOLLIN;
epev.data.fd=cfd;
epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&epev);
}
else{
// if(epevs[i].events&EPOLLOUT)
// {
// continue;
// }
//有数据到达
char buf[5]={0};
int len =read(curfd,buf,sizeof(buf));
if(len==-1)
{
perror("read");
exit(-1);
}
else if(len==0)
{
printf("client close...\n");
epoll_ctl(epfd,EPOLL_CTL_DEL,curfd,NULL);
close(curfd);
}
else if(len>0)
{
printf("read buf:%s\n",buf);
write(curfd,buf,strlen(buf+1));
}
}
}
}
close(lfd);
close(epfd);
return 0;
}
服务器
客户端
边沿触发:
边沿触发不是默认的,需要进行设置;在哪设置呢?常见的Epoll检测事件,添加EPOLLET;
设置了边沿触发,这种模式你要检测的缓冲区里面有数据,epoll检测的时候会通知你,如果没有读取完,epoll再次检测的时候,就不会通知你了。
再次发送数据,他就还能触发一次,读取顺序按照读缓冲区中的内容顺序进行读取的。那么边沿触发如果要一次性将数据读出来,那如何操作呢?
#include <stdio.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
int main()
{
//创建socket
int lfd=socket(PF_INET,SOCK_STREAM,0);
struct sockaddr_in saddr;
saddr.sin_port=htons(9999);
saddr.sin_family=AF_INET;
saddr.sin_addr.s_addr=INADDR_ANY;
//绑定
bind(lfd,(struct sockaddr*)&saddr,sizeof(saddr));
//监听
listen(lfd,8);
//调用epoll_creat 创建一个epoll实例
int epfd= epoll_create(100);
//将监听的文件描述符相关的检测信息添加到epoll实例中;
struct epoll_event epev;
epev.events=EPOLLIN;
epev.data.fd=lfd;
epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&epev);
struct epoll_event epevs[1024];
while(1)
{
int ret=epoll_wait(epfd,epevs,1024,-1);
if(ret==-1)
{
perror("epoll_wait");
exit(-1);
}
printf("ret=%d\n",ret);
for(int i=0;i<ret;i++)
{
int curfd=epevs[i].data.fd;
if(curfd==lfd)
{
//有客户端连接
struct sockaddr_in cliaddr;
int len=sizeof(cliaddr);
int cfd= accept(lfd,(struct sockaddr *)&cliaddr,&len);
printf("新的客户端添加:%d\n",cfd);
//设置cfd非阻塞属性;
int flag= fcntl(cfd,F_GETFL);
flag |=O_NONBLOCK;
fcntl(cfd,F_SETFL,flag);
epev.events=EPOLLIN | EPOLLET;//设置边沿触发
epev.data.fd=cfd;
epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&epev);
}
else{
if(epevs[i].events&EPOLLOUT)
{
continue;
}
//循环读取所有数据
char buf[5]={0};
int len=0;
while((len=read(curfd,buf,sizeof(buf)))>0)
{
//打印数据
// printf("recv data:%s\n",buf);
write(STDOUT_FILENO,buf,len);
write(curfd,buf,len);
}
if(len==0)
{
printf("client closed...");
}
else if (len==-1)
{
if(errno==EAGAIN)
{
printf("data over....");
}else
{
perror("read");
exit(-1);
}
}
}
}
}
close(lfd);
close(epfd);
return 0;
}
服务器端:
客户端: