上周面了个实习,感觉自己菜的一匹,唉,理论还是没有联系实际啊,继续学吧。
I/O复用使得程序能同时监听多个文件描述符,这对提高程序的性能至关重要。通常,网络程序在下列情况下需要使用I/O复用技术:
- 客户端程序要同时处理多个socket。
- 客户端程序要同时处理用户输入和网络连接。
- TCP服务器要同时处理监听socket和连接socket。
- 服务器要同时处理TCP请求和UDP请求。
- 服务器要同时监听多个端口,或者处理多种服务。
I/O复用虽然能同时监听多个文件描述符,但它本身是阻塞的。并且当多个文件描述符同时就绪时,如果不采取额外的措施,程序就只能按顺序依次处理其中的每一个文件描述符,这使得服务 器程序看起来像是串行工作的。如果要实现并发,只能使用多进程或多线程等编程手段。Linux下实现I/O复用的系统调用主要有select、poll和epoll,本章将依次学习,然后介绍使用它们的几个实例。
select系统调用
select系统调用的用途是:在一段指定时间内,监听用户感兴趣的文件描述符上的可读、可写和异常等事件。
select系统调用的原型如下:
#include<sys/select.h>
int select(int nfds,fd_set* readfds,fd_set* writefds,fd_set* exceptfds,struct
timeval* timeout);
- nfds参数指定被监听的文件描述符的总数。它通常被设置为select监听的所有文件描述符中的最大值加1,因为文件描述符是从0开始计数的;
- readfds、writefds和exceptfds参数分别指向可读、可写和异常等事件对应的文件描述符集合。应用程序调用select函数时,通过这3个参数传入自己感兴趣的文件描述符。select调用返回时,内核将修改它们来通知应用程序哪些文件描述符已经就绪。这3个参数是fd_set结构指针类型。fd_set结构体的定义如下:
#include<typesizes.h> #define__FD_SETSIZE 1024 #include<sys/select.h> #define FD_SETSIZE__FD_SETSIZE typedef long int__fd_mask; #undef__NFDBITS #define__NFDBITS(8*(int)sizeof(__fd_mask)) typedef struct { #ifdef__USE_XOPEN __fd_mask fds_bits[__FD_SETSIZE/__NFDBITS]; #define__FDS_BITS(set)((set)->fds_bits) #else __fd_mask__fds_bits[__FD_SETSIZE/__NFDBITS]; #define__FDS_BITS(set)((set)->__fds_bits) #endif }fd_set;
- timeout参数用来设置select函数的超时时间。它是一个timeval结构类型的指针,采用指针参数是因为内核将修改它以告诉应用程序select等待了多久。不过我们不能完全信任select调用返回后的timeout值,比如调用失败时timeout值是不确定的。timeval结构体的定义如下:
struct timeval { long tv_sec;/*秒数*/ long tv_usec;/*微秒数*/ };
select提供了一个微秒级的定时方式。如果给timeout变量的tv_sec成员和tv_usec成员都传递0,则select将立即返回。如果给timeout传递NULL,则select将一直阻塞,直到某个文件描述 符就绪。
select成功时返回就绪(可读、可写和异常)文件描述符的总数。如果在超时时间内没有任何文件描述符就绪,select将返回0。select失败时返回-1并设置errno。如果在select等待期间,程序接收到信号,则select立即返回-1,并设置errno为EINTR。
文件描述符就绪条件
在网络编程中,下列情况下socket可读:
- socket内核接收缓存区中的字节数大于或等于其低水位标记SO_RCVLOWAT。此时我们可以无阻塞地读该socket,并且读操作返回的字节数大于0。
- socket通信的对方关闭连接。此时对该socket的读操作将返回0。
- 监听socket上有新的连接请求。
- socket上有未处理的错误。此时我们可以使用getsockopt来读取和清除该错误。
下列情况下socket可写
- socket内核发送缓存区中的可用字节数大于或等于其低水位标记SO_SNDLOWAT。此时我们可以无阻塞地写该socket,并且写操作返回的字节数大于0。
- socket的写操作被关闭。对写操作被关闭的socket执行写操作将触发一个SIGPIPE信号。
- socket使用非阻塞connect连接成功或者失败(超时)之后。
- socket上有未处理的错误。此时我们可以使用getsockopt来读取和清除该错误。
网络程序中,select能处理的异常情况只有一种:socket上接收到带外数据。
socket上接收到普通数据和带外数据都将使select返回,但socket处于不同的就绪状态:前者处于可读状态,后者处于异常状态。下面代码描述了select是如何同时处理二者的:
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );
printf( "ip is %s and port is %d\n", ip, port );
int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );
int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( listenfd >= 0 );
ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
assert( ret != -1 );
ret = listen( listenfd, 5 );
assert( ret != -1 );
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
if ( connfd < 0 )
{
printf( "errno is: %d\n", errno );
close( listenfd );
}
char remote_addr[INET_ADDRSTRLEN];
printf( "connected with ip: %s and port: %d\n", inet_ntop( AF_INET, &client_address.sin_addr, remote_addr, INET_ADDRSTRLEN ), ntohs( client_address.sin_port ) );
char buf[1024];
//定义一系列宏来访问fd_set结构体中的位
fd_set read_fds;//可读
fd_set exception_fds;//异常
FD_ZERO( &read_fds );/*清除read_fds的所有位*/
FD_ZERO( &exception_fds );/*清除exception_fds的所有位*/
/*设置套接字描述符的属性*/
int nReuseAddr = 1;
/*SO_OOBINLINE:带外数据放入正常数据流*/
setsockopt( connfd, SOL_SOCKET, SO_OOBINLINE, &nReuseAddr, sizeof( nReuseAddr ) );
while( 1 )
{
/*每次调用select前都要重新在read_fds和exception_fds中设置文件描述符
connfd,因为事件发生之后,文件描述符集合将被内核修改*/
memset( buf, '\0', sizeof( buf ) );
FD_SET( connfd, &read_fds );
FD_SET( connfd, &exception_fds );
ret = select( connfd + 1, &read_fds, NULL, &exception_fds, NULL );
printf( "select one\n" );
if ( ret < 0 )
{
printf( "selection failure\n" );
break;
}
/*对于可读事件,采用普通的recv函数读取数据*/
if ( FD_ISSET( connfd, &read_fds ) )/*测试fdset的位fd是否被设置*/
{
ret = recv( connfd, buf, sizeof( buf )-1, 0 );
if( ret <= 0 )
{
break;
}
printf( "get %d bytes of normal data: %s\n", ret, buf );
}
/*对于异常事件,采用带MSG_OOB标志的recv函数读取带外数据*/
else if( FD_ISSET( connfd, &exception_fds ) )
{
ret = recv( connfd, buf, sizeof( buf )-1, MSG_OOB );
if( ret <= 0 )
{
break;
}
printf( "get %d bytes of oob data: %s\n", ret, buf );
}
}
close( connfd );
close( listenfd );
return 0;
}
不知道哪里出的问题,带外数据总是被当作普通数据。有没有大佬看到解释一下呀!
这是发送端代码:
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );
struct sockaddr_in server_address;
bzero( &server_address, sizeof( server_address ) );
server_address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &server_address.sin_addr );
server_address.sin_port = htons( port );
int sockfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( sockfd >= 0 );
if ( connect( sockfd, ( struct sockaddr* )&server_address, sizeof( server_address ) ) < 0 )
{
printf( "connection failed\n" );
}
else
{
printf( "send normal data out\n" );
const char* obb_data = "abc";
const char* normal_data = "xyz";
send( sockfd, oob_data, strlen(oob_data), MSG_OOB);
send( sockfd, normal_data, strlen( normal_data ), 0 );
}
close( sockfd );
return 0;
}
poll系统调用
poll系统调用和select类似,也是在指定时间内轮询一定数量的文件描述符,以测试其中是否有就绪者。poll的原型如下:
#include<poll.h>
int poll(struct pollfd* fds,nfds_t nfds,int timeout);
- fds参数是一个pollfd结构类型的数组,它指定所有我们感兴趣的文件描述符上发生的可读、可写和异常等事件。pollfd结构体的定义如下:
struct pollfd
{
int fd;/*文件描述符*/
short events;/*注册的事件*/
short revents;/*实际发生的事件,由内核填充*/
};
其中,fd成员指定文件描述符;events成员告诉poll监听fd上的哪些事件,它是一系列事件的按位或;revents成员则由内核修改,以通知应用程序fd上实际发生了哪些事件。poll支持的事件类型如表所示:
- nfds参数指定被监听事件集合fds的大小。其类型nfds_t的定义如下:
typedef unsigned long int nfds_t;
- timeout参数指定poll的超时值,单位是毫秒。当timeout为-1时,poll调用将永远阻塞,直到某个事件发生;当timeout为0时,poll调用将立即返回。
poll系统调用的返回值的含义与select相同。
epoll系列系统调用
内核事件表
epoll是Linux特有的I/O复用函数。它在实现和使用上与select、poll有很大差异。首先,epoll使用一组函数来完成任务,而不是单个函数。 其次,epoll把用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传入文件描述符集或事件集。但epoll需要使用一个额外的文件描述符,来唯一标识内核中的这个事件表。这个文件描述符使用如下epoll_create函数来创建:
#include<sys/epoll.h>
int epoll_create(int size)
size参数现在并不起作用,只是给内核一个提示,告诉它事件表需要多大。该函数返回的文件描述符将用作其他所有epoll系统调用的第一个参数,以指定要访问的内核事件表。 下面的函数用来操作epoll的内核事件表:
#include<sys/epoll.h>
int epoll_ctl(int epfd,int op,int fd,struct epoll_event* event)
1)fd参数是要操作的文件描述符
2)op参数则指定操作类型。操作类型有如下3种:
- EPOLL_CTL_ADD,往事件表中注册fd上的事件。
- EPOLL_CTL_MOD,修改fd上的注册事件。
- EPOLL_CTL_DEL,删除fd上的注册事件。
3)event参数指定事件,它是epoll_event结构指针类型。epoll_event的 定义如下:
struct epoll_event
{
__uint32_t events;/*epoll事件*/
epoll_data_t data;/*用户数据*/
};
其中events成员描述事件类型, data成员用于存储用户数据。
epoll_wait函数
epoll系列系统调用的主要接口是epoll_wait函数。它在一段超时时间内等待一组文件描述符上的事件,其原型如下:
#include<sys/epoll.h>
int epoll_wait(int epfd,struct epoll_event*events,int maxevents,int timeout);
该函数成功时返回就绪的文件描述符的个数,失败时返回-1并设置 errno。
- timeout参数的含义与poll接口的timeout参数相同。
- maxevents参数指定最多监听多少个事件,它必须大于0。
- epoll_wait函数如果检测到事件,就将所有就绪的事件从内核事件表(由epfd参数指定)中复制到它的第二个参数events指向的数组中。
这个数组只用于输出epoll_wait检测到的就绪事件,而不像select和poll的数组参数那样既用于传入用户注册的事件,又用于输出内核检测到的就绪事件。这就极大地提高了应用程序索引就绪文件描述符的效率。下面代码体现了poll和epoll在使用上的差别:
/*如何索引poll返回的就绪文件描述符*/
int ret=poll(fds,MAX_EVENT_NUMBER,-1);
/*必须遍历所有已注册文件描述符并找到其中的就绪者(当然,可以利用ret来稍做优
化)*/
for(int i=0;i<MAX_EVENT_NUMBER;++i)
{
if(fds[i].revents&POLLIN)/*判断第i个文件描述符是否就绪*/
{
int sockfd=fds[i].fd;
/*处理sockfd*/
}
}
/*如何索引epoll返回的就绪文件描述符*/
int ret=epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);
/*仅遍历就绪的ret个文件描述符*/
for(int i=0;i<ret;i++)
{
int sockfd=events[i].data.fd;
/*sockfd肯定就绪,直接处理*/
}
LT和ET模式
epoll对文件描述符的操作有两种模式:LT(Level Trigger,电平触发)模式和ET(Edge Trigger,边沿触发)模式。LT模式是默认的工作模式,这种模式下epoll相当于一个效率较高的poll。当往epoll内核事件表中注册一个文件描述符上的EPOLLET事件时,epoll将以ET模式来操作该文件描述符。ET模式是epoll的高效工作模式。
对于采用LT工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。 这样,当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序通告此事件,直到该事件被处理。而对于采用ET工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait调用将不再向应用程序通知这一事件。ET模式在很大程度上降低了同一个epoll事件被重复触发的次数,因此效率要比LT模式高。
下列代码体现了LT和ET在工作方式上的差异:
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10
/*将文件描述符设置成非阻塞的*/
int setnonblocking( int fd )
{
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}
/*将文件描述符fd上的EPOLLIN注册到epollfd指示的epoll内核事件表中,参数
enable_et指定是否对fd启用ET模式*/
void addfd( int epollfd, int fd, bool enable_et )
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN;
if( enable_et )
{
event.events |= EPOLLET;
}
epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
setnonblocking( fd );
}
/*LT模式的工作流程*/
void lt( epoll_event* events, int number, int epollfd, int listenfd )
{
char buf[ BUFFER_SIZE ];
for ( int i = 0; i < number; i++ )
{
int sockfd = events[i].data.fd;
if ( sockfd == listenfd )
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
addfd( epollfd, connfd, false );/*对connfd禁用ET模式*/
}
else if ( events[i].events & EPOLLIN )
{
/*只要socket读缓存中还有未读出的数据,这段代码就被触发*/
printf( "event trigger once\n" );
memset( buf, '\0', BUFFER_SIZE );
int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
if( ret <= 0 )
{
close( sockfd );
continue;
}
printf( "get %d bytes of content: %s\n", ret, buf );
}
else
{
printf( "something else happened \n" );
}
}
}
/*ET模式的工作流程*/
void et( epoll_event* events, int number, int epollfd, int listenfd )
{
char buf[ BUFFER_SIZE ];
for ( int i = 0; i < number; i++ )
{
int sockfd = events[i].data.fd;
if ( sockfd == listenfd )
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
addfd( epollfd, connfd, true );/*对connfd开启ET模式*/
}
else if ( events[i].events & EPOLLIN )
{
/*这段代码不会被重复触发,所以我们循环读取数据,以确保把socket读缓存中的所有
数据读出*/
printf( "event trigger once\n" );
while( 1 )
{
memset( buf, '\0', BUFFER_SIZE );
int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
if( ret < 0 )
{
/*对于非阻塞IO,下面的条件成立表示数据已经全部读取完毕。此后,epoll就能再次触
发sockfd上的EPOLLIN事件,以驱动下一次读操作*/
if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )
{
printf( "read later\n" );
break;
}
close( sockfd );
break;
}
else if( ret == 0 )
{
close( sockfd );
}
else
{
printf( "get %d bytes of content: %s\n", ret, buf );
}
}
}
else
{
printf( "something else happened \n" );
}
}
}
int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );
int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );
int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( listenfd >= 0 );
ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
assert( ret != -1 );
ret = listen( listenfd, 5 );
assert( ret != -1 );
epoll_event events[ MAX_EVENT_NUMBER ];
int epollfd = epoll_create( 5 );
assert( epollfd != -1 );
addfd( epollfd, listenfd, true );
while( 1 )
{
int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
if ( ret < 0 )
{
printf( "epoll failure\n" );
break;
}
lt( events, ret, epollfd, listenfd );/*使用LT模式*/
//et( events, ret, epollfd, listenfd );/*使用ET模式*/
}
close( listenfd );
return 0;
}
服务器端运行一下这段代码,然后telnet到这个服务器程序上并一 次传输超过10字节(BUFFER_SIZE的大小)的数据,然后比较LT模式和ET模式的异同。会发现ET模式下事件被触发的次数要比LT模式下少。(但是奇怪的是做实验时二者触发次数是相同的,目前笔者还没解决该问题,后续有进展会在此处及时更新)。
EPOLLONESHOT事件
即使我们使用ET模式,一个socket上的某个事件还是可能被触发多次。这在并发程序中就会引起一个问题。比如一个线程(或进程,下同)在读取完某个socket上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有新数据可读(EPOLLIN再次被触发),此时另外一个线程被唤醒来读取这些新的数据。于是就出现了两个线程同时操作一个socket的局面。这当然不是我们期望的。我们期望的是一个socket 连接在任一时刻都只被一个线程处理。这一点可以使用epoll的 EPOLLONESHOT事件实现。
对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个可读、可写或者异常事件,且只触发一次,除非我们使用epoll_ctl函数重置该文件描述符上注册的EPOLLONESHOT事件。 这样,当一个线程在处理某个socket时,其他线程是不可能有机会操作 该socket的。但反过来思考,注册了EPOLLONESHOT事件的socket一旦被某个线程处理完毕,该线程就应该立即重置这个socket上的 EPOLLONESHOT事件,以确保这个socket下一次可读时,其EPOLLIN 事件能被触发,进而让其他工作线程有机会继续处理这个socket。
下面代码展示了EPOLLONEDSHOT事件的使用
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 1024
struct fds
{
int epollfd;
int sockfd;
};
int setnonblocking( int fd )
{
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}
/*将fd上的EPOLLIN和EPOLLET事件注册到epollfd指示的epoll内核事件表中,参数
oneshot指定是否注册fd上的EPOLLONESHOT事件*/
void addfd( int epollfd, int fd, bool oneshot )
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
if( oneshot )
{
event.events |= EPOLLONESHOT;
}
epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
setnonblocking( fd );
}
/*重置fd上的事件。这样操作之后,尽管fd上的EPOLLONESHOT事件被注册,但是操作
系统仍然会触发fd上的EPOLLIN事件,且只触发一次*/
void reset_oneshot( int epollfd, int fd )
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );
}
/*工作线程*/
void* worker( void* arg )
{
int sockfd = ( (fds*)arg )->sockfd;
int epollfd = ( (fds*)arg )->epollfd;
printf( "start new thread to receive data on fd: %d\n", sockfd );
char buf[ BUFFER_SIZE ];
memset( buf, '\0', BUFFER_SIZE );
/*循环读取sockfd上的数据,直到遇到EAGAIN错误*/
while( 1 )
{
int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
if( ret == 0 )
{
close( sockfd );
printf( "foreiner closed the connection\n" );
break;
}
else if( ret < 0 )
{
if( errno == EAGAIN )
{
reset_oneshot( epollfd, sockfd );
printf( "read later\n" );
break;
}
}
else
{/*休眠5s,模拟数据处理过程*/
printf( "get content: %s\n", buf );
sleep( 5 );
}
}
printf( "end thread receiving data on fd: %d\n", sockfd );
}
int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );
int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );
int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( listenfd >= 0 );
ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
assert( ret != -1 );
ret = listen( listenfd, 5 );
assert( ret != -1 );
epoll_event events[ MAX_EVENT_NUMBER ];
int epollfd = epoll_create( 5 );
assert( epollfd != -1 );
/*注意,监听socket listenfd上是不能注册EPOLLONESHOT事件的,否则应用程序只
能处理一个客户连接!因为后续的客户连接请求将不再触发listenfd上的EPOLLIN事件*/
addfd( epollfd, listenfd, false );
while( 1 )
{
int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
if ( ret < 0 )
{
printf( "epoll failure\n" );
break;
}
for ( int i = 0; i < ret; i++ )
{
int sockfd = events[i].data.fd;
if ( sockfd == listenfd )
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
/*对每个非监听文件描述符都注册EPOLLONESHOT事件*/
addfd( epollfd, connfd, true );
}
else if ( events[i].events & EPOLLIN )
{
pthread_t thread;
fds fds_for_new_worker;
fds_for_new_worker.epollfd = epollfd;
fds_for_new_worker.sockfd = sockfd;
/*新启动一个工作线程为sockfd服务*/
pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );
}
else
{
printf( "something else happened \n" );
}
}
}
close( listenfd );
return 0;
}
从工作线程函数worker来看,如果一个工作线程处理完某个socket 上的一次请求(我们用休眠5s来模拟这个过程)之后,又接收到该socket上新的客户请求,则该线程将继续为这个socket服务。并且因为该 socket上注册了EPOLLONESHOT事件,其他线程没有机会接触这个socket,如果工作线程等待5s后仍然没收到该socket上的下一批客户数据,则它将放弃为该socket服务。同时,它调用reset_oneshot函数来重置该socket上的注册事件,这将使epoll有机会再次检测到该socket上的EPOLLIN事件,进而使得其他线程有机会为该socket服务。 由此看来,尽管一个socket在不同时间可能被不同的线程处理,但同一时刻肯定只有一个线程在为它服务。这就保证了连接的完整性,从而避免了很多可能的竞态条件。