I/O复用使得程序能同时监听多个文件描述符。在以下场景中需要使用到IO复用技术:
- 客户端程序要同时处理多个socket,非阻塞connect技术
- 客户端程序要同时处理用户输入和网络连接,聊天室程序
- TCP服务器要同时处理监听socket和连接socket
- 服务器要同时处理TCP请求和UDP请求,回射服务器
- 服务器要同时监听多个端口,或者处理多种服务,xinetd服务器
需要指出的是,I/O复用虽然能同时监听多个文件描述符,但它本身是阻塞的。并且当多个文件描述符同时就绪时,如果不采取额外的措施,程序就只能按顺序依次处理其中的每一个文件描述符,这使得服务器程序看起来像是串行工作的。如果要实现并发,只能使用多进程或多线程等编程手段。
一、select系统调用
select可以在一段指定的时间内监听用户感兴趣的文件描述符上可读、可写和异常等事件。
select是跨平台的,支持Linux,Max,Windows。
1.1、select API
#include <sys/select.h>
int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout);
-
nfds
参数指定被监听的文件描述符的总数。它通常被设置为select监听的所有文件描述符中的最大值加1,因为文件描述符是从0开始计数的。 -
readfds
、writefds
和exceptfds
参数分别指向可读、可写和异常等事件对应的文件描述符集合。这3个参数是fd_set
结构指针类型。fd_set
结构体的定义如下:-
#include <typesizes.h> #define __FD_SETSIZE 1024 #include <sys/select.h> #define FD_SETSIZE __FD_SETSIZE typedef long int__fd_mask; #undef __NFDBITS #define __NFDBITS (8 * (int)sizeof(__fd_mask)) typedef struct { #ifdef __USE_XOPEN __fd_mask fds_bits[__FD_SETSIZE/__NFDBITS]; #define __FDS_BITS(set) ((set)->fds_bits) #else __fd_mask__fds_bits[__FD_SETSIZE/__NFDBITS]; #define __FDS_BITS(set) ((set)->__fds_bits) #endif }fd_set;
-
-
fd_set
结构体仅包含一个整型数组,该数组的每个元素的每一位(bit)标记一个文件描述符。fd_set
能容纳的文件描述符数量由FD_SETSIZE
指定,这就限制了select
能同时处理的文件描述符的总量。下面的函数提供了位操作。-
#include <sys/select.h> // 将文件描述符fd从set集合中删除 == 将fd对应的标志位设置为0 void FD_CLR(int fd, fd_set *set); // 判断文件描述符fd是否在set集合中 == 读一下fd对应的标志位到底是0还是1 int FD_ISSET(int fd, fd_set *set); // 将文件描述符fd添加到set集合中 == 将fd对应的标志位设置为1 void FD_SET(int fd, fd_set *set); // 将set集合中, 所有文件文件描述符对应的标志位设置为0, 集合中没有添加任何文件描述符 void FD_ZERO(fd_set *set);
-
-
timeout
参数用来设置select
函数的超时时间。它是一个timeval结构类型的指针,采用指针参数是因为内核将修改它以告诉应用程序select等待了多久。如果给timeout
变量的tv_sec成员和tv_usec成员都传递0,则select将立即返回。如果给timeout传递NULL,则select将一直阻塞,直到某个文件描述符就绪。-
struct timeval { long tv_sec;/*秒数*/ long tv_usec;/*微秒数*/ };
-
-
select成功时返回就绪(可读、可写和异常)文件描述符的总数。如果在超时时间内没有任何文件描述符就绪,select将返回0。select失败时返回-1并设置errno。如果在select等待期间,程序接收到信号,则select立即返回-1,并设置errno为EINTR。
1.2、文件描述符就绪条件
下列情况下socket可读:
socket
内核接收缓存区中的字节数大于或等于其低水位标记SO_RCVLOWAT。此时我们可以无阻塞地读该socket,并且读操作返回的字节数大于0。socket
通信的对方关闭连接。此时对该socket的读操作将返回0。socket
上有新的连接请求。socket
上有未处理的错误。此时我们可以使用getsockopt来读取和清除该错误。
下列情况socket可写:
socket
内核发送缓存区中的可用字节数大于或等于其低水位标记SO_SNDLOWAT。此时我们可以无阻塞地写该socket,并且写操作返回的字节数大于0。socket
的写操作被关闭。对写操作被关闭的socket执行写操作将触发一个SIGPIPE信号。socket
使用非阻塞connect连接成功或者失败(超时)之后。socket
上有未处理的错误。此时我们可以使用getsockopt来读取和清除该错误。
1.3、带外数据
socket
上接收到普通数据和带外数据都将使select返回,但socket处于不同的就绪状态:前者处于可读状态,后者处于异常状态。
同时接收普通数据和带外数据:
服务端:
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#define PORT 8080
#define BUFFER_SIZE 8192
int main(int argc,char* argv[]) {
char buf[BUFFER_SIZE];
ssize_t bytes_read;
int server_socket, client_socket;
struct sockaddr_in server_addr, client_addr;
socklen_t addr_len = sizeof(struct sockaddr_in);
// 创建socket
server_socket = socket(AF_INET, SOCK_STREAM, 0);
if (server_socket == -1) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}
// 命名socket
memset(&server_addr, 0, addr_len);
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(PORT);
server_addr.sin_addr.s_addr = INADDR_ANY;
if (bind(server_socket, (struct sockaddr *) &server_addr, addr_len) == -1) {
perror("bind failed");
close(server_socket);
exit(EXIT_FAILURE);
}
// 监听socket
if (listen(server_socket, 5) == -1) {
perror("listen failed");
close(server_socket);
exit(EXIT_FAILURE);
}
printf("Server is listening on port %d...\n", PORT);
// 连接
client_socket = accept(server_socket, (struct sockaddr*) &client_addr,&addr_len);
if (client_socket<0) {
printf("errno is:%d\n",errno);
close(server_socket);
}
fd_set read_fds; // 监听可读事件
fd_set exception_fds; // 监听异常事件
FD_ZERO(&read_fds); // 初始化
FD_ZERO(&exception_fds); // 初始化
while(1) {
// 初始化buf
memset(buf,'\0',sizeof(buf));
// 每次调用select前都要重新在read_fds和exception_fds中设置文件描述符client_socket,因为事件发生之后,文件描述符集合将被内核修改
FD_SET(client_socket, &read_fds);
FD_SET(client_socket, &exception_fds);
FD_SET(server_socket, &read_fds);
FD_SET(server_socket, &exception_fds);
int ret = select(client_socket+1,&read_fds,NULL,&exception_fds,NULL);
if(ret<0) {
printf("selection failure\n");
break;
}
/* 对于可读事件,采用普通的recv函数读取数据 */
if(FD_ISSET(client_socket, &read_fds)) {
ret = recv(client_socket,buf,sizeof(buf)-1,0);
if(ret<=0) break;
printf("get %d bytes of normal data:%s",ret,buf);
}
/* 对于异常事件,采用带MSG_OOB标志的recv函数读取带外数据 */
else if(FD_ISSET(client_socket, &exception_fds)) {
ret = recv(client_socket, buf, sizeof(buf)-1, MSG_OOB);
if(ret<=0) break;
printf("get %d bytes of oob data:%s",ret,buf);
}
}
close(server_socket);
close(client_socket);
return 0;
}
客户端:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#define SERVER_IP "127.0.0.1"
#define SERVER_PORT 8080
#define BUFFER_SIZE 1024
int main() {
int client_socket;
struct sockaddr_in server_addr;
char buffer[BUFFER_SIZE];
ssize_t bytes_read;
// 创建socket
client_socket = socket(AF_INET, SOCK_STREAM, 0);
if (client_socket == -1) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}
// 设置服务器地址信息
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
if (inet_pton(AF_INET, SERVER_IP, &server_addr.sin_addr) <= 0) {
perror("invalid server address");
close(client_socket);
exit(EXIT_FAILURE);
}
// 连接到服务器
if (connect(client_socket, (struct sockaddr *) &server_addr, sizeof(server_addr)) == -1) {
perror("connection failed");
close(client_socket);
exit(EXIT_FAILURE);
}
printf("Connected to server\n");
// 发送消息给服务器
const char *message1 = "Hello, this is client!\n";
send(client_socket, message1, strlen(message1), 0);
sleep(1);
// 发送带外数据
const char *message2 = "Hello, this is client and data is oob!\n";
send(client_socket, message2, strlen(message2), MSG_OOB);
// 关闭连接
close(client_socket);
}
仿真结果