目录
思考一个问题:
I/O多路复用select函数
代码实现
net.h
server.c:
socket.c
思考一个问题:
我们还是把视角放到应用B从TCP缓冲区中读取数据这个环节来。如果在并发的环境下,可能会N个人向应用B发送消息,这种情况下我们的应用就必须创建多个线程去读取数据,每个线程都会自己调用recvfrom 去读取数据。那么此时情况可能如下图:
如上图一样,并发情况下服务器很可能一瞬间会收到几十上百万的请求,这种情况下应用B就需要创建几十上百万的线程去读取数据,同时又因为应用线程是不知道什么时候会有数据读取,为了保证消息能及时读取到,那么这些线程自己必须不断的向内核发送recvfrom 请求来读取数据;
那么问题来了,这么多的线程不断调用recvfrom 请求数据,先不说服务器能不能扛得住这么多线程,就算扛得住那么很明显这种方式是不是太浪费资源了,线程是我们操作系统的宝贵资源,大量的线程用来去读取数据了,那么就意味着能做其它事情的线程就会少。
所以,有人就提出了一个思路,能不能提供一种方式,可以由一个线程监控多个网络请求(我们后面将称为fd文件描述符,linux系统把所有网络请求以一个fd来标识),这样就可以只需要一个或几个线程就可以完成数据状态询问的操作,当有数据准备就绪之后再分配对应的线程去读取数据,这么做就可以节省出大量的线程资源出来,这个就是IO复用模型的思路。
正如上图,IO复用模型的思路就是系统提供了一种函数可以同时监控多个fd的操作,这个函数就是我们常说到的select、poll、epoll函数,有了这个函数后,应用线程通过调用select函数就可以同时监控多个fd,select函数监控的fd中只要有任何一个数据状态准备就绪了,select函数就会返回可读状态,这时询问线程再去通知处理数据的线程,对应线程此时再发起recvfrom请求去读取数据。
术语描述:进程通过将一个或多个fd传递给select,阻塞在select操作上,select帮我们侦测多个fd是否准备就绪,当有fd准备就绪时,select返回数据可读状态,应用程序再调用recvfrom读取数据。
I/O多路复用select函数
select函数:
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,
struct timeval *timeout);
/*timeout结构体
struct timeval {
long tv_sec; //秒
long tv_usec; //微妙
**/
参数:
nfds: 是三个集合中编号最高的文件描述符,加上1;
readfds :可读集合;(常用)
writefds :可写集合;
exceptfds: 异常集合;
timeout:
NULL: 永久阻塞;
0:非阻塞模式;
poll函数:
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
epoll API:
epoll_create
epoll_wait
epoll_ctl
代码实现
net.h
#ifndef _NET_H_
#define _NET_H_
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <strings.h>
#include <errno.h>
typedef struct sockaddr Addr;
typedef struct sockaddr_in Addr_in;
#define BACKLOG 5
#define ErrExit(msg) do { perror(msg); exit(EXIT_FAILURE); } while(0)
void Argment(int argc, char *argv[]);
int CreateSocket(char *argv[]);
int DataHandle(int fd);
#endif
server.c:
#include "net.h"
#include <sys/select.h>
#define MAX_SOCK_FD 1024
int main(int argc, char *argv[])
{
int i, ret, fd, newfd;
fd_set set, tmpset;
Addr_in clientaddr;
socklen_t clientlen = sizeof(Addr_in);
/*检查参数,小于3个 直接退出进程*/
Argment(argc, argv);
/*创建已设置监听模式的套接字*/
fd = CreateSocket(argv);
FD_ZERO(&set);
FD_ZERO(&tmpset);
FD_SET(fd, &set);
while(1){
tmpset = set;
if( (ret = select(MAX_SOCK_FD, &tmpset, NULL, NULL, NULL)) < 0)
ErrExit("select");
if(FD_ISSET(fd, &tmpset) ){
/*接收客户端连接,并生成新的文件描述符*/
if( (newfd = accept(fd, (Addr *)&clientaddr, &clientlen) ) < 0)
perror("accept");
printf("[%s:%d]已建立连接\n",
inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port));
FD_SET(newfd, &set);
}else{ //处理客户端数据
for(i = fd + 1; i < MAX_SOCK_FD; i++){
if(FD_ISSET(i, &tmpset)){
if( DataHandle(i) <= 0){
if( getpeername(i, (Addr *)&clientaddr, &clientlen) )
perror("getpeername");
printf("[%s:%d]断开连接\n",
inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port));
FD_CLR(i, &set);
}
}
}
}
}
return 0;
}
socket.c
#include "net.h"
void Argment(int argc, char *argv[]){
if(argc < 3){
fprintf(stderr, "%s<addr><port>\n", argv[0]);
exit(0);
}
}
int CreateSocket(char *argv[]){
/*创建套接字*/
int fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd < 0)
ErrExit("socket");
/*允许地址快速重用*/
int flag = 1;
if( setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag) ) )
perror("setsockopt");
/*设置通信结构体*/
Addr_in addr;
bzero(&addr, sizeof(addr) );
addr.sin_family = AF_INET;
addr.sin_port = htons( atoi(argv[2]) );
/*绑定通信结构体*/
if( bind(fd, (Addr *)&addr, sizeof(Addr_in) ) )
ErrExit("bind");
/*设置套接字为监听模式*/
if( listen(fd, BACKLOG) )
ErrExit("listen");
return fd;
}
int DataHandle(int fd){
char buf[BUFSIZ] = {};
Addr_in peeraddr;
socklen_t peerlen = sizeof(Addr_in);
if( getpeername(fd, (Addr *)&peeraddr, &peerlen) )
perror("getpeername");
int ret = recv(fd, buf, BUFSIZ, 0);
if(ret < 0)
perror("recv");
if(ret > 0){
printf("[%s:%d]data: %s\n",
inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port), buf);
}
return ret;
}