编写main.cpp
1.socket通信
服务器应用程序可以通过读取和写入 Socket 对象
- 来监听来自客户端的请求
- 并向客户端返回响应
#define MAX_FD 65536 // 最大的文件描述符个数
#define MAX_EVENT_NUMBER 10000 // 监听的最大的事件数量
// 添加信号捕捉
void addsig(int sig, void( handler )(int)){//信号处理函数
struct sigaction sa;//创建信号量
memset( &sa, '\0', sizeof( sa ) );
sa.sa_handler = handler;
sigfillset( &sa.sa_mask );//设置信号临时阻塞等级
assert( sigaction( sig, &sa, NULL ) != -1 );//注册信号
}
int main( int argc, char* argv[] ) {
// 判断参数个数,至少要传递一个端口号
if( argc <= 1 ) {
printf( "usage: %s port_number\n", basename(argv[0]));
return 1;
}
// 获取端口号,转换成整数
int port = atoi( argv[1] );
addsig( SIGPIPE, SIG_IGN );
// 使用socketAPI编写Reactor组件,通过监听socket文件描述符获取连接请求
int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); // 创建用于监听的socket文件描述符
int ret = 0;
// 存放服务器的地址信息
struct sockaddr_in address;
address.sin_family = AF_INET;//使用IPv4协议
address.sin_addr.s_addr = INADDR_ANY; //监听所有网卡的连接请求
address.sin_port = htons( port );//将端口号(大端小端)转换为网络字节序,并保存到address结构体中
// 端口复用
int reuse = 1;
// 让多个进程绑定同一个端口,从而实现负载均衡或者高可用等功能
setsockopt( listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof( reuse ) );
// 绑定服务器的地址信息
ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
ret = listen( listenfd, 5 );
...
return 0;
}
2.初始化线程池
// 创建线程池,并初始化
// 来一个任务之后,要封装成一个任务对象,交给线程池去处理
threadpool< http_conn >* pool = NULL;
try {
pool = new threadpool<http_conn>;
} catch( ... ) {
return 1;
}
- 事件处理器:在初始化线程池时,输入的参数是http_conn对象,这个就是用于处理任务的任务类
// 创建一个数组用于保存所有的客户端信息
// 每当有新连接进来时,都会在 users 数组中找到一个未使用的 http_conn 对象,
// 进行初始化并保存该连接对应的信息
http_conn* users = new http_conn[ MAX_FD ];
3.创建 epoll 对象 和事件数组 events
- 将监听的文件描述符 listenfd 添加到 epoll 对象中
// 创建epoll对象,和事件数组,添加
epoll_event events[ MAX_EVENT_NUMBER ];
int epollfd = epoll_create( 5 );//创建epoll对象,通过该文件描述符对 epoll 进行控制和管理(监听)
// 将监听的文件描述符添加到 epoll 对象中
addfd( epollfd, listenfd, false );
http_conn::m_epollfd = epollfd;//赋值
// 添加文件描述符到 epoll 中
extern void addfd( int epollfd, int fd, bool one_shot );
- 其中,在http_conn.cpp 中编写 addfd 函数
// 向epoll中添加需要监听的文件描述符
void addfd( int epollfd, int fd, bool one_shot ) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLRDHUP;
if(one_shot)
{
// 防止同一个通信被不同的线程处理
event.events |= EPOLLONESHOT;
}
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
// 设置文件描述符非阻塞
setnonblocking(fd);
}
4.同步 I/O 模拟 Proactor 模式
// 检测函数----检测epoll树中是否有就绪的文件描述符
// 创建了epfd,设置好某个fd上需要检测事件并将该fd绑定到epfd上去后,就可以调用epoll_wait
// 检测事件了
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
- 参数:
- epfd : epoll实例对应的文件描述符
- events : 传出参数,保存了发送了变化的文件描述符的信息
- maxevents : 第二个参数结构体数组的大小
- timeout : 阻塞时间
- 0 : 不阻塞
- -1 : 阻塞,直到检测到fd数据发生变化,解除阻塞
- > 0 : 阻塞的时长(毫秒)
- 返回值:
- 成功,返回发送变化的文件描述符的个数 > 0
- 失败 -1

// 同步 I/O 模拟 Proactor 模式的工作流程
while(true){
/*
1、阻塞等待文件描述符监听到的事件
2、遍历事件数组,判断事件类型,进行对应处理
*/
}
- while循环不断检测有无事件发生,具体就是使用 epoll_wait 获取监听 socket 的文件描述符所返回的事件数量
- epoll_wait()函数不断地检测文件描述符epollfd上是否有I/O事件发生
- 当有可读或可写事件发生时,epoll_wait()函数 会返回一个events数组,并将其中的事件信息填充到数组中,遍历数组中的所有事件,根据事件类型进行相应的处理。
// 编写Reactor组件
while(true) {
// 具体来说就是使用epoll_wait获取监听socket的文件描述符所返回得到事件数量
int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
if ( ( number < 0 ) && ( errno != EINTR ) ) {
printf( "epoll failure\n" );
break;
}
// 循环遍历事件数组
for ( int i = 0; i < number; i++ ) {
int sockfd = events[i].data.fd;
if( sockfd == listenfd ) {
// 有客户端连接进来
...
}else if( events[i].events & ( EPOLLRDHUP | EPOLLHUP | EPOLLERR ) ) {
// 对方异常断开或错误异常
...
}else if(events[i].events & EPOLLIN) {
// 判断是否有读事件发生
...
} else if( events[i].events & EPOLLOUT ) {
// 判断是否有写事件发生
...
}
}
}
**************************************************************************************************************
以下总结来自这篇文章:【从0开始编写webserver·基础篇#02】服务器的核心---I/O处理单元和任务类 - dayceng - 博客园 (cnblogs.com)
- epoll_wait 是一个系统调用函数,用于等待文件描述符上的I/O事件;
- epollfd 是通过 epoll_create函数 创建的 epoll实例的文件描述符,它用于管理需要监视的文件描述符集合;
- listenfd 是服务器应用程序使用的套接字文件描述符,它与 epollfd 关联,并使用 epoll_ctl函数 ,将其添加到 epollfd 所管理的文件描述符集合中。
epollfd 代表了一个 epoll实例,负责管理需要监视的文件描述符集合,而 listenfd 则是需要被监视的文件描述符之一,它被添加到 epollfd 所管理的文件描述符集合中,以便在有新的客户端连接请求时能够及时通知服务器程序。当 epoll_wait 函数返回时,它会将事件列表填入events数组中,告诉服务器哪些文件描述符发生了I/O事件,然后服务器应用程序根据这些事件来执行相应的操作。
- EPOLLRDHUP表示TCP连接的远程端关闭或半关闭连接,即对方关闭了socket连接或者shutdown写操作。
- EPOLLHUP表示挂起的连接或监听套接字已经关闭。它可能是一个错误,也可能是一个正常情况,因为它只代表文件描述符不再可用,而不是一定有错误。
- EPOLLERR表示错误事件。例如:socket被对端重置(rst);对于udp的epoll来说,他可以支持多个端口绑定,当然你不能bind两次同一个端口,那么第二次就会返回-1并且errno会被设置为EADDRINUSE;还有就是当读取时没有数据则返回-1并且errno被设置为EAGAIN 。
**************************************************************************************************************
(1)listenfd 有读事件发生
- 有新的客户端连接进来,接受客户端连接 accept() ,将新的客户端数据初始化并存储到http_conn 类型的 users 数组中
if( sockfd == listenfd ) {
// 有客户端连接进来
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
if ( connfd < 0 ) {
printf( "errno is: %d\n", errno );
continue;
}
if( http_conn::m_user_count >= MAX_FD ) {
// 目前连接满了
printf("服务器正忙...\n");
close(connfd);
continue;
}
// 将新的客户的数据初始化,放到数组中
users[connfd].init( connfd, client_address);
}
- 在服务器中,通常会使用一个监听socket(listenfd)来接受客户端的连接请求
- 当有新的客户端连接到来时,服务器会使用accept函数创建一个新的连接socket(connfd)
- 这个新的socket会与客户端的socket建立起通信连接
(2)sockfd 的 EPOLLRDHUP、EPOLLHUP 或 EPOLLERR事件
// 对方异常断开或错误异常
else if( events[i].events & ( EPOLLRDHUP | EPOLLHUP | EPOLLERR ) ) {
// http_conn类型的users数组中该客户端的状态设置为关闭
users[sockfd].close_conn();
}
(3)sockfd 有读事件发生
- 当events中存储的事件为sockfd可读事件时,表示该socket有数据可读, 此时主线程一次性读取完所有请求数据read(),成功读完后要交给工作线程处理。
- 将读取到的数据封装成一个请求对象并插入请求队列。调用线程池,追加任务。线程池执行 run 函数,不断从队列去取。取到就做业务处理,解析、生成响应数据
else if(events[i].events & EPOLLIN) {
// 判断是否有读事件发生
if(users[sockfd].read()) {// 一次性读取完所有请求数据,read()
// 成功读完后要交给工作线程处理
// 调用线程池,追加任务
// 线程池执行 run 函数,不断从队列去取
// 取到就做业务处理,解析、生成响应数据
pool->append(users + sockfd);
} else {//读失败,关闭
users[sockfd].close_conn();
}
}
-
users 数组中的每个元素都代表一个客户端连接, 数组的下标是该客户端的文件描述符 fd
-
users + sockfd 就是获取到该客户端连接的 http_conn 对象的指针
-
调用线程池对象的 append 函数,该指针作为参数
-
将该指针所指向的 http_conn 对象添加到线程池的任务队列中, 等待线程池的工作线程来处理
(4)sockfd有写事件发生
- 主线程调用 epoll_wait 等待 socket 可写
- 当 socket 可写时,epoll_wait 通知主线程。主线程往 socket 上写入服务器处理客户请求的结果
else if( events[i].events & EPOLLOUT ) {
// 将响应数据发送给客户端,若发送成功则继续等待下一个写事件发生,
// 否则关闭该链接
if( !users[sockfd].write() ) {
users[sockfd].close_conn();
}
}
5.完整代码
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include "locker.h"
#include "threadpool.h"
#include "http_conn.h"
#define MAX_FD 65536 // 最大的文件描述符个数
#define MAX_EVENT_NUMBER 10000 // 监听的最大的事件数量
/*
自定义函数 addfd,需要把监听的文件描述符 listenfd 添加到 epoll 对象中,
即将它加入到内核事件表中
*/
// 添加文件描述符到 epoll 中
extern void addfd( int epollfd, int fd, bool one_shot );
extern void removefd( int epollfd, int fd );
// 添加信号捕捉
void addsig(int sig, void( handler )(int)){//信号处理函数
struct sigaction sa;//创建信号量
memset( &sa, '\0', sizeof( sa ) );
sa.sa_handler = handler;
sigfillset( &sa.sa_mask );//设置信号临时阻塞等级
assert( sigaction( sig, &sa, NULL ) != -1 );//注册信号
}
/*
模拟 proactor 模式,主线程监听事件
当有读事件产生,在主线程中一次性读出来,封装成一个任务对象(用任务类)
然后交给子线程(线程池队列中的工作线程),线程池再去取任务做任务
*/
int main( int argc, char* argv[] ) {
// 判断参数个数,至少要传递一个端口号
if( argc <= 1 ) {
printf( "usage: %s port_number\n", basename(argv[0]));
return 1;
}
// 获取端口号,转换成整数
int port = atoi( argv[1] );
addsig( SIGPIPE, SIG_IGN );
// 创建线程池,并初始化
// 来一个任务之后,要封装成一个任务对象,交给线程池去处理
threadpool< http_conn >* pool = NULL;
try {
pool = new threadpool<http_conn>;
} catch( ... ) {
return 1;
}
// 创建一个数组用于保存所有的客户端信息
// 每当有新连接进来时,都会在 users 数组中找到一个未使用的 http_conn 对象,进行初始化并保存该连接对应的信息
http_conn* users = new http_conn[ MAX_FD ];
// 使用socketAPI编写Reactor组件,通过监听socket文件描述符获取连接请求
int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); // 创建用于监听的socket文件描述符
int ret = 0;
// 存放服务器的地址信息
struct sockaddr_in address;
address.sin_family = AF_INET;//使用IPv4协议
address.sin_addr.s_addr = INADDR_ANY; //监听所有网卡的连接请求
address.sin_port = htons( port );//将端口号(大端小端)转换为网络字节序,并保存到address结构体中
// 端口复用
int reuse = 1;
setsockopt( listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof( reuse ) );//让多个进程绑定同一个端口,从而实现负载均衡或者高可用等功能
// 绑定服务器的地址信息
ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
ret = listen( listenfd, 5 );
// 创建epoll对象,和事件数组,添加
epoll_event events[ MAX_EVENT_NUMBER ];
int epollfd = epoll_create( 5 );//创建epoll对象,通过该文件描述符对 epoll 进行控制和管理(监听)
// 将监听的文件描述符添加到 epoll 对象中
addfd( epollfd, listenfd, false );
http_conn::m_epollfd = epollfd;//赋值
// 编写Reactor组件
while(true) {
// 具体来说就是使用epoll_wait获取监听socket的文件描述符所返回得到事件数量
int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
if ( ( number < 0 ) && ( errno != EINTR ) ) {
printf( "epoll failure\n" );
break;
}
// 循环遍历事件数组
for ( int i = 0; i < number; i++ ) {
int sockfd = events[i].data.fd;
if( sockfd == listenfd ) {
// 有客户端连接进来
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
if ( connfd < 0 ) {
printf( "errno is: %d\n", errno );
continue;
}
if( http_conn::m_user_count >= MAX_FD ) {
// 目前连接满了
printf("服务器正忙...\n");
close(connfd);
continue;
}
// 将新的客户的数据初始化,放到数组中
users[connfd].init( connfd, client_address);
} else if( events[i].events & ( EPOLLRDHUP | EPOLLHUP | EPOLLERR ) ) {
// 对方异常断开或错误异常
users[sockfd].close_conn();
} else if(events[i].events & EPOLLIN) {
// 判断是否有读事件发生
if(users[sockfd].read()) {// 一次性读出数据,read()
// 成功读完后要交给工作线程处理
// 调用线程池,追加任务
// 线程池执行 run 函数,不断从队列去取
// 取到就做业务处理,解析、生成响应数据
pool->append(users + sockfd);
} else {//读失败,关闭
users[sockfd].close_conn();
}
} else if( events[i].events & EPOLLOUT ) {
if( !users[sockfd].write() ) {
users[sockfd].close_conn();
}
}
}
}
close( epollfd );
close( listenfd );
delete [] users;
delete pool;
return 0;
}
推荐和参考此文章:
【从0开始编写webserver·基础篇#02】服务器的核心---I/O处理单元和任务类 - dayceng - 博客园 (cnblogs.com)