文章目录
- @[TOC](文章目录)
- 一、五种IO模型概念
- 1.阻塞IO
- 2.非阻塞IO
- 3.信号驱动IO
- 4.多路复用/多路转接IO
- 5.异步IO
- 二、非阻塞IO之fcntl应用
- 1.fcntl系统调用接口介绍
- 2.用fcntl实现非阻塞IO
- 三、多路转接IO之select应用
- 1.select接口介绍
- 2.使用select实现多路转接IO
- select的优缺点
- 四、多路转接IO之poll应用
- 1.poll接口介绍
- 2.使用poll实现多路转接IO
- 3.poll特点
- 五、多路转接IO之epoll应用
- 1.epoll接口介绍
- 2.epoll原理
- 3.用epoll实现多路转接IO
- 4.reactor服务器概念
- 5.epoll工作模式
- 六、reactor服务器模式设计
- 1.多线程模式设计理念
- 2.master slaver模式设计理念
文章目录
- @[TOC](文章目录)
- 一、五种IO模型概念
- 1.阻塞IO
- 2.非阻塞IO
- 3.信号驱动IO
- 4.多路复用/多路转接IO
- 5.异步IO
- 二、非阻塞IO之fcntl应用
- 1.fcntl系统调用接口介绍
- 2.用fcntl实现非阻塞IO
- 三、多路转接IO之select应用
- 1.select接口介绍
- 2.使用select实现多路转接IO
- select的优缺点
- 四、多路转接IO之poll应用
- 1.poll接口介绍
- 2.使用poll实现多路转接IO
- 3.poll特点
- 五、多路转接IO之epoll应用
- 1.epoll接口介绍
- 2.epoll原理
- 3.用epoll实现多路转接IO
- 4.reactor服务器概念
- 5.epoll工作模式
- 六、reactor服务器模式设计
- 1.多线程模式设计理念
- 2.master slaver模式设计理念
一、五种IO模型概念
IO的本质是等待+数据拷贝,而等待的本质是检测IO事件是否就绪.因为操作系统将系统缓冲区数据拷贝的用户缓冲区需要系统缓冲区数据就绪以及用户缓冲区空间就绪,从用户缓冲区拷贝数据到系统缓冲区需要用户缓冲区同样如此.达到IO条件叫做IO时间就绪.根据等待和数据拷贝的方式方法的不同衍生出了五种IO模型.
1.阻塞IO
在内核数据没有准备好之前,读/写系统调用会一直阻塞.
2.非阻塞IO
不管内核数据有没有准备好,读/写系统调用都会直接返回,而不是阻塞.
3.信号驱动IO
不用每次都调用系统调用检测IO事件是否就绪,IO事件就绪时,系统调用发送SIGIO信号通知进程,进程捕捉信号,设置递达动作处理处理数据.
4.多路复用/多路转接IO
同时等待多个IO事件就绪,有IO事件就绪就处理,使处理IO事件更加高效.
5.异步IO
由内核在后台数据拷贝完成时, 通知应用程序使用数据(而信号驱动是告诉应用程序何时可以开始拷贝数据).
二、非阻塞IO之fcntl应用
1.fcntl系统调用接口介绍
int fcntl(int fd, int cmd, ... /* arg */ );
功能:操作一个打开的文件描述符,操作有cmd参数决定
参数:fd为被操作的文件描述符,cmd为fd的操作指令宏值,...可变参数如何使用由cmd决定.
cmd的可选宏值:
F_GETFL:获取文件的访问模式和文件状态标志,可变参数忽略
F_SETFL:用可变参数指定的宏值设置文件状态标志,在Linux操作系统下,此命令只能修改
O_APPEND, O_ASYNC, O_DIRECT, O_NOATIME和O_NONBLOCK标志。
返回值:如果调用成功,返回值与取决于cmd命令,cmd是F_GETFL返回值就是文件描述标志,cmd除了F_DUPFD, F_GETFD,F_GETFL,F_GETLEASE,F_GETOWN,F_GETSIG, F_GETPIPE_SZ这些命令其他命令调用成功后返回值都是0.失败返回-1.
2.用fcntl实现非阻塞IO
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <cstdio>
#include <vector>
#include <functional>
using namespace std;
void SetNnblock(int fd)
{
int flag = fcntl(fd, F_GETFL);
if (flag != -1)
{
fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}
}
// 任务加载容器
using func = function<void(void)>;
vector<func> task;
// 其他任务
void Task1() { cout << "task1" << endl; }
void Task2() { cout << "task2" << endl; }
void Task3() { cout << "task3" << endl; }
void LoadTask()
{
task.push_back(Task1);
task.push_back(Task2);
task.push_back(Task3);
}
int main()
{
// 加载任务
LoadTask();
// 设置fd非阻塞属性
SetNnblock(0);
char buffer[2048];
while (true)
{
// 执行其他任务
for (int i = 0; i < task.size(); i++)
task[i]();
// 非阻塞读
ssize_t n = read(0, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n - 1] = 0;
cout << ">: " << buffer << endl;
}
else
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
// 底层数据没有准备好,希望你下次继续来检测
HandlerALLTask();
sleep(1);
std::cout << "data not ready" << std::endl;
continue;
}
else if (errno == EINTR)
{
// 这次IO被信号中断,也需要重新读取
continue;
}
}
sleep(1);
}
return 0;
}
三、多路转接IO之select应用
1.select接口介绍
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
功能:程序一次等待多个文件描述符,这个文件描述符可以是用来链接的或者是IO数据的,直到这个文件描述符的IO事件为就绪状态.
参数: nfds:需要被等待的文件描述符其中的最大值+1;
timeout:时间参数,timeval结构体可以设置秒和微米两个选项.如果两个参数都为0,表示select不阻塞等待,如果设置为null表示没有文件描述符就绪就一直阻塞等待,结构体其中一个参数>0表示这个时间内阻塞等待直到与文件描述符就绪,函数返回,timeout被内核设置剩余时间反馈回来,或者超时直接返回,timeout被设置为0.
struct timeval
{
long tv_sec; /* seconds秒 */
long tv_usec; /* microseconds */
}
readfds,writefds,exceptfds,都是位图结构体,用来讲需要内核等待的描述符按照读写异常事件类型设置到对应的对象中,这三个是输入输出型参数,用户将需要内核检测的文件描述符设置到结构体中,传参给内核,内核将检测结果覆盖设置到结构体中反馈回来.
typedef struct
{
#ifdef __USE_XOPEN
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
} fd_set;
设置fd_set对象中有专门的宏函数可以使用.
void FD_CLR(int fd, fd_set *set);
判断fd是否在set位图中
int FD_ISSET(int fd, fd_set *set);
将fd设置到set位图中
void FD_SET(int fd, fd_set *set);
初始化set
void FD_ZERO(fd_set *set);
返回值:成功则返回三个就绪事件位图中设置的文件描述符总数,失败返回-1,返回后readfds,writefds,exceptfds,timeout参数需要重新设置.
2.使用select实现多路转接IO
log.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <sys/types.h>
#include <unistd.h>
#include <sstream>
#include <time.h>
#include<cstdio>
namespace log_space
{
enum
{
DEBUG,
INFO,
ERROR,
FATAL,
};
std::string GetLevelStr(int level)
{
std::string str;
switch (level)
{
case DEBUG:
str = "DEBUG";
break;
case INFO:
str = "INFO";
break;
case ERROR:
str = "ERROR";
break;
case FATAL:
str = "FATAL";
break;
default:
str = "UNKNOW";
break;
}
return str;
}
std::string GetTimeStr()
{
time_t _time = time(nullptr);
struct tm *pt = localtime(&_time);
char buffer[1024];
snprintf(buffer, sizeof(buffer), "%d-%d-%d_%d:%d:%d", pt->tm_year + 1900, pt->tm_mon + 1,
pt->tm_mday, pt->tm_hour, pt->tm_min, pt->tm_sec);
std::string str = buffer;
return str;
}
void LogMessage(int level, const char *Format, ...)
{
// 级别,时间,pid
char left[2048];
std::string level_str = GetLevelStr(level);
std::string time_str = GetTimeStr();
pid_t id = getpid();
snprintf(left, sizeof(left), "[%s][%s][%d]", level_str.c_str(), time_str.c_str(), id);
char right[2048];
va_list p;
va_start(p, Format);
vsnprintf(right, sizeof(right), Format, p);
va_end(p);
printf("%s: %s\n",left,right);
}
}
sock.hpp
#pragma once
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <cstring>
#include <errno.h>
#include "log.hpp"
#include "error.hpp"
#define BACKLOG_DEFAULT 32
namespace sock_space
{
class Sock
{
public:
Sock() : _fd(-1)
{
}
// 获取套接字
void Socket()
{
_fd = socket(AF_INET, SOCK_STREAM, 0);
if (_fd < 0)
{
log_space::LogMessage(log_space::FATAL, "socket error,code:%d,%s", errno, strerror(errno));
exit(SOCK_ERR);
}
log_space::LogMessage(log_space::INFO,"socket success");
}
// 绑定
void Bind(const uint16_t &port)
{
struct sockaddr_in client;
memset(&client, 0, sizeof(client));
client.sin_addr.s_addr = htons(INADDR_ANY);
client.sin_family = AF_INET;
client.sin_port = htons(port);
if (bind(_fd, (struct sockaddr *)&client, sizeof(client)) == -1)
{
log_space::LogMessage(log_space::FATAL, "bind error,code:%d,%s", errno, strerror(errno));
exit(BIND_ERR);
}
log_space::LogMessage(log_space::INFO,"bind success");
}
// 监听
void Listen()
{
int n = listen(_fd, BACKLOG_DEFAULT);
if (n == -1)
{
log_space::LogMessage(log_space::FATAL, "listen error,code:%d,%s", errno, strerror(errno));
exit(LISTEN_ERR);
}
log_space::LogMessage(log_space::INFO,"listen success");
}
// 获取链接
int Accept(std::string &client_ip, uint16_t &client_port)
{
// 获取io套接字
struct sockaddr_in client;
socklen_t len = sizeof(client);
int sockfd = accept(_fd, (struct sockaddr *)&client, &len);
if (sockfd == -1)
{
log_space::LogMessage(log_space::ERROR, "accept fail,relink......");
}
log_space::LogMessage(log_space::INFO,"accept success");
// 输出客户端ip和port
client_ip = inet_ntoa(client.sin_addr);
client_port = ntohs(client.sin_port);
return sockfd;
}
// 链接
int Connect(const string &server_ip, const uint16_t &server_port)
{
// 设置服务器地址与端口结构体
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_addr.s_addr = inet_addr(server_ip.c_str());
server.sin_port = htons(server_port);
// 向服务器申请链接
int n = connect(_fd, (struct sockaddr *)&server, sizeof(server));
if (n == -1)
{
log_space::LogMessage(log_space::ERROR, "connect fail,relink......");
}
log_space::LogMessage(log_space::INFO,"connect success");
return n;
}
int get_sockfd() { return _fd; }
~Sock()
{
close(_fd);
}
private:
int _fd;
};
}
多路转接IO的本质是一次性等待多个fd就绪,所以要将每个需要检测是否就绪的文件描述符以及套接字都交给select处理
首先创建套接字,设置监听状态,listen套接字需要等待有客户端链接时才能获取连接,所以需要将listen套接字交给select处理
listen套接字就绪后获取IO套接字,IO套接字需要等待系统接收缓冲区就绪了才能拷贝数据,所以也需要交给select处理
然而,select是以位图的方式汇聚所有要检测是否就绪的fd传参的,而且参数是输入输出型参数,函数返回时,位图会被设置成只有就绪套接字的位图.下次再调用select需要重新将fd重新设置到fd_set位图对象中去.所以每次被设置的fd需要先保存在一个固定数组中,而且这个fd可能是读操作也可能是写操作,所以还要保存它的属性.所以这个数组成员必须是一个结构体.
select_server.hpp
#pragma once
#include "sock.hpp"
#include "log.hpp"
#include "error.hpp"
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <string>
#include <cerrno>
#include <cstring>
#include <queue>
// 默认端口号
static const uint16_t default_port = 8080;
// IO事件信息数组长度
static const int N = (sizeof(fd_set) * 8);
// IO事件信息
struct FdEvent
{
int _fd;
short _Event;
std::string _ip;
uint16_t _port;
};
enum
{
READ = 1,
WRITE,
};
class select_server
{
typedef FdEvent type_t;
public:
select_server(uint16_t port = default_port) : _port(port)
{
for (int i = 0; i < N; i++)
{
_fd_array[i]._fd = -1;
_fd_array[i]._Event = 0;
_fd_array[i]._port = 0;
}
}
// 初始化
void init()
{
// 创建套接字
_sock.Socket();
// 绑定ip和port
_sock.Bind(_port);
// 设置套接字监听状态
_sock.Listen();
}
// 监听就绪处理
void HandlerListenReady()
{
// 获取IO套接字
std::string client_ip;
uint16_t client_port = 0;
int sockfd = _sock.Accept(client_ip, client_port);
log_space::LogMessage(log_space::INFO,"链接%s::%d成功",client_ip.c_str(),client_port);
// 检测fd数组空位
int pos = 1;
for (; pos < N; pos++)
{
if (_fd_array[pos]._fd == -1)
break;
}
// fd_set空间用尽
if (pos >= N)
{
close(sockfd);
}
// 有空间,将IO套接字设置进fd数组
_fd_array[pos]._fd = sockfd;
_fd_array[pos]._Event = READ;
_fd_array[pos]._ip = client_ip;
_fd_array[pos]._port = client_port;
}
// 读就绪处理
void HandlerReadReady(int index)
{
char buffer[1024];
int n = read(_fd_array[index]._fd, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout <<_fd_array[index]._ip<<"::"<<_fd_array[index]._port<<"# "<<buffer << std::endl;
std::string sdata="server > ";
sdata+=buffer;
write(_fd_array[index]._fd, sdata.c_str(), sdata.size());
}
else
{
if (n < 0)
{
log_space::LogMessage(log_space::ERROR, "read error,error code:%d,error info%s", errno, strerror(errno));
}
else
{
log_space::LogMessage(log_space::INFO, "client quit");
}
close(_fd_array[index]._fd);
_fd_array[index]._fd = -1;
_fd_array[index]._Event = 0;
_fd_array[index]._ip.clear();
_fd_array[index]._port = 0;
}
}
/* // 写就绪处理
void HandlerWriteReady(int index)
{
std::string recv_data = _data.front();
_data.pop();
write(_fd_array[index]._fd, recv_data.c_str(), recv_data.size());
} */
// 统一处理就绪事件
void HandlerListenFd(fd_set &rfds, fd_set &wrfds)
{
// 检测就绪的IO事件
for (int i = 0; i < N; i++)
{
int fd = _fd_array[i]._fd;
short event = _fd_array[i]._Event;
// 监听套接字就绪
if (fd == _sock.get_sockfd() && FD_ISSET(fd, &rfds))
{
log_space::LogMessage(log_space::INFO,"fd->%d,监听就绪");
HandlerListenReady();
}
// 读套接字就绪
else if (fd != -1 && FD_ISSET(fd, &rfds))
{
log_space::LogMessage(log_space::INFO,"fd->%d,读就绪");
HandlerReadReady(i);
}
/* else if (fd != -1 && FD_ISSET(fd, &wrfds))
{
log_space::LogMessage(log_space::INFO,"fd->%d,写就绪");
HandlerWriteReady(i);
} */
else
{}
}
}
// 运行
void start()
{
// listenfd加入数组
_fd_array[0]._fd = _sock.get_sockfd();
_fd_array[0]._Event = READ;
while (true)
{
// 将需要检测的fd交给内核
fd_set rfds, wrfds;
timeval t;
t.tv_sec = 5;
int maxfd = 0;
FD_ZERO(&rfds);
FD_ZERO(&wrfds);
for (int i = 0; i < N; i++)
{
if (_fd_array[i]._fd != -1 && (_fd_array[i]._Event & READ))
{
FD_SET(_fd_array[i]._fd, &rfds);
}
if (_fd_array[i]._fd != -1 && (_fd_array[i]._Event & WRITE))
{
FD_SET(_fd_array[i]._fd, &wrfds);
}
if (maxfd < _fd_array[i]._fd)
maxfd = _fd_array[i]._fd;
}
//打印fd
std::cout<<"fd: ";
for (auto a : _fd_array)
{
if (a._fd != -1)
std::cout << a._fd << " ";
}
std::cout << std::endl;
// 每次阻塞等待5秒钟
int s = select(maxfd+1, &rfds, &wrfds, nullptr, nullptr);
// 就绪
if (s > 0)
{
HandlerListenFd(rfds, wrfds);
}
else if (s == 0)
{
log_space::LogMessage(log_space::INFO, "timeout error code:%d,error info:%s",errno,strerror(errno));
}
else
{
log_space::LogMessage(log_space::ERROR, "select error,error code: %d error info: %s", errno, strerror(errno));
}
}
}
private:
uint16_t _port;
sock_space::Sock _sock;
type_t _fd_array[N];
std::queue<std::string> _data;
};
select的优缺点
每次调用select, 都需要手动设置fd集合, 从接口使用角度来说也非常不便.
每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
每次检测数组中那个fd就绪需要遍历这个数组.
select支持的文件描述符数量太小.
四、多路转接IO之poll应用
1.poll接口介绍
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
功能:功能:程序一次等待多个文件描述符,这个文件描述符可以是用来链接的或者是IO数据的,直到这个文件描述符的IO事件为就绪状态.
参数:fds是一个poll函数监听的结构列表. 每一个元素中, 包含了三部分内容: 文件描述符, 监听的事件集合, 返回的事件集合.
pollfd结构
struct pollfd
{
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
}
nfds表示fds数组的长度.
timeout表示poll函数的超时时间, 单位是毫秒(ms).
返回值:等于0, 表示poll函数等待超时;大于0, 表示poll由于监听的文件描述符就绪而返回,小于0,表示出错
2.使用poll实现多路转接IO
除了server.hpp改变,其他的与select相同
poll_server.hpp
#pragma once
#include "sock.hpp"
#include "log.hpp"
#include "error.hpp"
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <string>
#include <cerrno>
#include <cstring>
#include <queue>
#include <poll.h>
// 默认端口号
static const uint16_t default_port = 8080;
// IO事件信息数组长度
static const int N = 4096;
class poll_server
{
typedef struct pollfd type_t;
public:
poll_server(uint16_t port = default_port) : _port(port)
{
}
// 初始化
void init()
{
// 创建套接字
_sock.Socket();
// 绑定ip和port
_sock.Bind(_port);
// 设置套接字监听状态
_sock.Listen();
_fd_array = new type_t[N];
for (int i = 0; i < N; i++)
{
_fd_array[i].fd = -1;
_fd_array[i].events = 0;
_fd_array[i].revents = 0;
}
}
// 监听就绪处理
void HandlerListenReady()
{
// 获取IO套接字
std::string client_ip;
uint16_t client_port = 0;
int sockfd = _sock.Accept(client_ip, client_port);
log_space::LogMessage(log_space::INFO, "链接%s::%d成功", client_ip.c_str(), client_port);
// 检测fd数组空位
int pos = 1;
for (; pos < N; pos++)
{
if (_fd_array[pos].fd == -1)
break;
}
// fd_set空间用尽
if (pos >= N)
{
close(sockfd);
}
// 有空间,将IO套接字设置进fd数组
_fd_array[pos].fd = sockfd;
_fd_array[pos].events = POLLIN;
}
// 读就绪处理
void HandlerReadReady(int index)
{
char buffer[1024];
int n = read(_fd_array[index].fd, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout << "client# " << buffer << std::endl;
std::string sdata = "server > ";
sdata += buffer;
write(_fd_array[index].fd, sdata.c_str(), sdata.size());
}
else
{
if (n < 0)
{
log_space::LogMessage(log_space::ERROR, "read error,error code:%d,error info%s", errno, strerror(errno));
}
else
{
log_space::LogMessage(log_space::INFO, "client quit");
}
close(_fd_array[index].fd);
_fd_array[index].fd = -1;
_fd_array[index].events = 0;
}
}
/* // 写就绪处理
void HandlerWriteReady(int index)
{
std::string recv_data = _data.front();
_data.pop();
write(_fd_array[index]._fd, recv_data.c_str(), recv_data.size());
} */
// 统一处理就绪事件
void HandlerListenFd()
{
// 检测就绪的IO事件
for (int i = 0; i < N; i++)
{
int fd = _fd_array[i].fd;
short event = _fd_array[i].events;
// 监听套接字就绪
if (fd == _sock.get_sockfd() && _fd_array[i].revents & POLLIN)
{
log_space::LogMessage(log_space::INFO, "fd->%d,监听就绪");
HandlerListenReady();
}
// 读套接字就绪
else if (fd != _sock.get_sockfd() && _fd_array[i].revents & POLLIN)
{
log_space::LogMessage(log_space::INFO, "fd->%d,读就绪");
HandlerReadReady(i);
}
/* else if (fd != -1 && FD_ISSET(fd, &wrfds))
{
log_space::LogMessage(log_space::INFO,"fd->%d,写就绪");
HandlerWriteReady(i);
} */
else
{
}
}
}
// 运行
void start()
{
// listenfd加入数组
_fd_array[0].fd = _sock.get_sockfd();
_fd_array[0].events = POLLIN;
while (true)
{
// 每次阻塞等待5秒钟
int s = poll(_fd_array, N, -1);
std::cout << "fd> ";
for (int i = 0; i < N; i++)
{
if (_fd_array[i].fd != -1)
std::cout << _fd_array[i].fd << " ";
}
std::cout << std::endl;
// 判断是否就绪
if (s > 0)
{
HandlerListenFd();
}
else if (s == 0)
{
log_space::LogMessage(log_space::INFO, "timeout error code:%d,error info:%s", errno, strerror(errno));
}
else
{
log_space::LogMessage(log_space::ERROR, "select error,error code: %d error info: %s", errno, strerror(errno));
}
}
}
private:
uint16_t _port;
sock_space::Sock _sock;
type_t *_fd_array;
std::queue<std::string> _data;
};
3.poll特点
不同与select使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现.
pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式. 接口使用比select更方便.
poll并没有最大数量限制 (但是数量过大后性能也是会下降).
poll的缺点
poll中监听的文件描述符数目增多时和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符.
每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中.
同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降.
五、多路转接IO之epoll应用
1.epoll接口介绍
int epoll_create(int size);
功能:创建一个epoll对象,epoll是一个管理模型,也就是结构体.
参数:size任意大于零的数字
返回值:成功则返回一个非负的文件描述符,失败返回-1并设置错误码.
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
功能:对被文件描述符epfd引用的epoll对象进行op增删改操作.增删改内容包括文件描述符fd和event事件.
参数:epfd,epoll_create的返回值
op:控制操作类型,有EPOLL_CTL_ADD,EPOLL_CTL_DEL,EPOLL_CTL_MOD,分别对应增删改
fd:被epoll对象操作的文件描述符
event:被epoll对象操作的文件操作属性,是一个结构体.
typedef union epoll_data {
void *ptr;
int fd; //一般只用这个
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
返回值:成功返回0,失败返回-1,错误码被设置.
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
功能:等待被epfd引用的epoll对象中的所有事件,最多返回maxevents个可用事件.
参数:epfd是epoll_create的返回值.
events用于接收内核就绪事件的events结构体数组指针.
maxevents:最多返回事件的总数
timeout:epoll_wait最大阻塞等待的时间,单位毫秒
返回值:0代表等待超时,>0代表就绪事件的fd的个数,失败返回-1
2.epoll原理
操作系统内部维护了一个红黑树阻塞结构和一个就绪队列结构体模型,结构体模型以及结构操作方法共同组成了epoll模型,当调用epoll_create时os创建一个epoll对象,并返回一个与epoll对象建立映射关系的文件描述符epfd;
有需要管理的fd时,就调用epoll_ctl,将需要管理的fd以及事件性质作为节点添加到epoll对象管理的红黑树结构体中等待
当网卡收到网络发送过来的数据后向连接了CPU的针脚发送电流信号,产生硬件中断信号,CPU收到信号后查询中断向量表,中断向量表是一个函数指针数组,数组成员为硬件驱动程序,调用对应的硬件驱动将硬件信息拷贝到内核缓冲区中,并设置回调函数,将报文通过IP协议拷贝至传输层文件缓冲区,最后系统调用回调函数将就绪事件节点插入就绪队列.
通过调用epoll_wait将就绪文件缓冲区的数据拷贝至应用层用户缓冲区.操作系统只需要判断就绪队列是否为空,就可以判断有没有就绪的IO事件.
3.用epoll实现多路转接IO
log.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <sys/types.h>
#include <unistd.h>
#include <sstream>
#include <time.h>
#include<cstdio>
namespace log_space
{
enum
{
DEBUG,
INFO,
ERROR,
FATAL,
};
std::string GetLevelStr(int level)
{
std::string str;
switch (level)
{
case DEBUG:
str = "DEBUG";
break;
case INFO:
str = "INFO";
break;
case ERROR:
str = "ERROR";
break;
case FATAL:
str = "FATAL";
break;
default:
str = "UNKNOW";
break;
}
return str;
}
std::string GetTimeStr()
{
time_t _time = time(nullptr);
struct tm *pt = localtime(&_time);
char buffer[1024];
snprintf(buffer, sizeof(buffer), "%d-%d-%d_%d:%d:%d", pt->tm_year + 1900, pt->tm_mon + 1,
pt->tm_mday, pt->tm_hour, pt->tm_min, pt->tm_sec);
std::string str = buffer;
return str;
}
void LogMessage(int level, const char *Format, ...)
{
// 级别,时间,pid
char left[2048];
std::string level_str = GetLevelStr(level);
std::string time_str = GetTimeStr();
pid_t id = getpid();
snprintf(left, sizeof(left), "[%s][%s][%d]", level_str.c_str(), time_str.c_str(), id);
char right[2048];
va_list p;
va_start(p, Format);
vsnprintf(right, sizeof(right), Format, p);
va_end(p);
printf("%s: %s\n",left,right);
}
}
util.hpp
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <cstring>
#include <ctype.h>
namespace util_space
{
void split(const std::string &str, const char *sep, std::vector<std::string> &strv)
{
int start = 0;
while (start < str.size())
{
// 找分隔符位置
auto pos = str.find(sep, start);
if (pos == std::string::npos)
break;
// 分割字符串到strv
std::string result = str.substr(start, pos - start);
strv.push_back(result);
// 更新查找sep的位置
start = pos + strlen(sep);
}
// 最后一段字符可能不包含分隔符
if (start < str.size())
{
strv.push_back(str.substr(start, str.size() - start));
}
}
}
protocol.hpp
#pragma once
#include "log.hpp"
#include "util.hpp"
#include <iostream>
#include <string>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <jsoncpp/json/json.h>
namespace protocol_space
{
// 分隔符
#define SEPARATOR "\r\n"
#define SEP_LEN strlen(SEPARATOR)
// 接收数据包
int GetPackage(std::string &str, std::string &package)
{
// 查找第一个分隔符
auto pos = str.find(SEPARATOR);
if (pos == std::string::npos)
return 0;
// 获取报头
std::string header = str.substr(0, pos);
// 获取有效载荷长度
int len = atoi(header.c_str());
// 确定完整数据包长度
int overall_len = len + header.size() + SEP_LEN * 2;
// 判断str的长度是否达到完整数据包长度
if (str.size() < overall_len)
return 0;
// 截取完整数据包到str中去
package = str.substr(0, overall_len);
// 删除str中已经被复制截取的部分
str.erase(0, overall_len);
return len;
}
// 去除报头
void RemoveHeader(std::string &str, int len)
{
std::cout << "去除报头之前: "<<std::endl;
std::cout<< str << std::endl;
str = str.substr(str.size() - SEP_LEN - len, len);
std::cout << "去除报头之后: "<<std::endl;
std::cout<< str << std::endl;
}
// 添加报头
void AddHeader(std::string &str)
{
std::cout << "添加报头之前: "<<std::endl;
std::cout<< str << std::endl;
std::string strlen = std::to_string(str.size());
str = strlen + SEPARATOR + str + SEPARATOR;
std::cout << "添加报头之后: "<<std::endl;
std::cout << str << std::endl;
}
// 需求
class request
{
public:
request() : _x(-1), _y(-1), _op(' ')
{
}
request(int x, int y, char op) : _x(x), _y(y), _op(op)
{
}
// 反序列化
void deserializetion(const std::string &str)
{
#ifdef MYSELF
// 分割
std::vector<std::string> strv;
util_space::split(str, " ", strv);
for (auto e : strv)
{
std::cout << e << " ";
}
std::cout << std::endl;
// 转换
_x = atoi(strv[0].c_str());
_op = strv[1][0];
_y = atoi(strv[2].c_str());
#else
// 从将序列化信息分割填充在root结构体中
Json::Value root;
Json::Reader read;
read.parse(str, root);
// 从root中提取结构化信息
_x = root["x"].asInt();
_op = root["op"].asInt();
_y = root["y"].asInt();
#endif
}
void print()
{
std::cout << "x: " << _x << std::endl;
std::cout << "y: " << _y << std::endl;
std::cout << "op: " << _op << std::endl;
}
// 序列化
void serializetion(std::string &str)
{
#ifdef MYSELF
str = std::to_string(_x) + ' ' + _op + ' ' + std::to_string(_y);
#else
// 将信息填充在root结构中
Json::Value root;
root["x"] = _x;
root["y"] = _y;
root["op"] = _op;
// 用root结构体将信息序列化到str中
Json::StyledWriter writer;
// Json::FastWriter writer;
str = writer.write(root);
#endif
}
~request()
{
}
public:
int _x;
int _y;
char _op;
};
// 回应
class response
{
public:
response() : _result(-1), _code(-1)
{
}
response(int result, int code) : _result(result), _code(code)
{
}
// 反序列化
void deserializetion(const std::string &str)
{
#ifdef MYSELF
// 分割
std::vector<std::string> strv;
util_space::split(str, " ", strv);
// 转换
_result = atoi(strv[0].c_str());
_code = atoi(strv[1].c_str());
#else
// 从将序列化信息分割填充在root结构体中
Json::Value root;
Json::Reader reader;
reader.parse(str, root);
// 从root中提取结构化信息
_result = root["result"].asInt();
_code = root["code"].asInt();
#endif
}
// 序列化
void serializetion(std::string &str)
{
#ifdef MYSELF
str = std::to_string(_result) + ' ' + std::to_string(_code);
#else
// 将信息填充在root结构中
Json::Value root;
root["result"] = _result;
root["code"] = _code;
// 用root结构体将信息序列化到str中
Json::StyledWriter writer;
// Json::FastWriter writer;
str = writer.write(root);
#endif
}
void print()
{
std::cout << "result: " << _result << std::endl;
std::cout << "code: " << _code << std::endl;
}
~response()
{
}
public:
int _result;
int _code;
};
}
sock.hpp
#pragma once
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <cstring>
#include <errno.h>
#include "log.hpp"
#include "error.hpp"
#define BACKLOG_DEFAULT 32
namespace sock_space
{
class Sock
{
public:
Sock() : _fd(-1)
{
}
// 获取套接字
void Socket()
{
_fd = socket(AF_INET, SOCK_STREAM, 0);
if (_fd < 0)
{
log_space::LogMessage(log_space::FATAL, "socket error,code:%d,%s", errno, strerror(errno));
exit(SOCK_ERR);
}
}
// 绑定
void Bind(const uint16_t &port)
{
struct sockaddr_in client;
memset(&client, 0, sizeof(client));
client.sin_addr.s_addr = htons(INADDR_ANY);
client.sin_family = AF_INET;
client.sin_port = htons(port);
if (bind(_fd, (struct sockaddr *)&client, sizeof(client)) == -1)
{
log_space::LogMessage(log_space::FATAL, "bind error,code:%d,%s", errno, strerror(errno));
exit(BIND_ERR);
}
log_space::LogMessage(log_space::INFO,"bind success");
}
// 监听
void Listen()
{
int n = listen(_fd, BACKLOG_DEFAULT);
if (n == -1)
{
log_space::LogMessage(log_space::FATAL, "listen error,code:%d,%s", errno, strerror(errno));
exit(LISTEN_ERR);
}
log_space::LogMessage(log_space::INFO,"listen success");
}
// 获取链接
int Accept(std::string &client_ip, uint16_t &client_port,int& err)
{
// 获取io套接字
struct sockaddr_in client;
socklen_t len = sizeof(client);
int sockfd = accept(_fd, (struct sockaddr *)&client, &len);
if (sockfd == -1&& (errno!=EAGAIN||errno!=EWOULDBLOCK)&&errno!=EINTR)
{
log_space::LogMessage(log_space::ERROR, "accept fail,error code:%d,error info:%s",errno,strerror(errno));
}
err=errno;
// 输出客户端ip和port
client_ip = inet_ntoa(client.sin_addr);
client_port = ntohs(client.sin_port);
return sockfd;
}
// 链接
int Connect(const string &server_ip, const uint16_t &server_port)
{
// 设置服务器地址与端口结构体
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_addr.s_addr = inet_addr(server_ip.c_str());
server.sin_port = htons(server_port);
// 向服务器申请链接
int n = connect(_fd, (struct sockaddr *)&server, sizeof(server));
if (n == -1)
{
log_space::LogMessage(log_space::ERROR, "connect fail,relink......");
}
log_space::LogMessage(log_space::INFO,"connect success");
return n;
}
int ListenFd() { return _fd; }
~Sock()
{
close(_fd);
}
private:
int _fd;
};
}
epoll_client.cc
#include "sock.hpp"
#include "protocol.hpp"
#include <cstdlib>
enum Stat
{
LEFT,
MIDDLE,
RIGHT,
};
protocol_space::request FormatStr(std::string str)
{
Stat s = LEFT;
std::string x, y, op;
for (int i = 0; i < str.size();)
{
switch (s)
{
case LEFT:
{
if (isdigit(str[i]))
{
x.push_back(str[i++]);
}
else
{
s = MIDDLE;
}
}
break;
case MIDDLE:
{
op = str[i++];
s = RIGHT;
}
break;
default:
{
y.push_back(str[i++]);
}
break;
}
}
protocol_space::request req;
req._x = atoi(x.c_str());
req._y = atoi(y.c_str());
req._op = op[0];
return req;
}
int main(int argc, char *argv[])
{
// 创建套接字
sock_space::Sock _sock;
_sock.Socket();
// 链接服务器
int N = 5;
std::string serverip = argv[1];
uint16_t serverport = atoi(argv[2]);
while (_sock.Connect(serverip, serverport) == -1)
{
--N;
if (N <= 0)
{
exit(0);
}
}
while (true)
{
// 获取用户输入
cout << "输入操作 ";
std::string str;
getline(std::cin, str);
// 格式化
protocol_space::request req = FormatStr(str);
// 序列化
std::string sendstr;
req.serializetion(sendstr);
// 加报头
protocol_space::AddHeader(sendstr);
// 发送数据
ssize_t sn = send(_sock.ListenFd(), sendstr.c_str(), sendstr.size(), 0);
if (sn < 0)
{
log_space::LogMessage(log_space::ERROR, "send error,error code:%d,error info:%s", errno, strerror(errno));
}
// 接收数据
std::string recvstr;
std::string buffer(1024, '0');
ssize_t rn = recv(_sock.ListenFd(), (char *)buffer.c_str(), buffer.size(), 0);
if (rn > 0)
{
buffer.resize(rn);
}
else if (rn == 0)
{
log_space::LogMessage(log_space::INFO, "server quit");
return 0;
}
else
{
log_space::LogMessage(log_space::ERROR, "recv error,error code:%d,error info:%s", errno, strerror(errno));
exit(-1);
}
int n = protocol_space::GetPackage(buffer, recvstr);
if (n == 0)
continue;
else
{
// 去报头
protocol_space::RemoveHeader(recvstr, n);
// 反序列化
protocol_space::response resp;
resp.deserializetion(recvstr);
// 输出信息
std::cout << "server recv# "
<< "result=" << resp._result << " "
<< "code=" << resp._code << std::endl;
}
}
return 0;
}
epoll_server.cc
#include "epoll_server.hpp"
#include "protocol.hpp"
#include <memory>
#include <cstdio>
#include<stdlib.h>
// 信息处理
protocol_space::response Calculate(const protocol_space::request &req)
{
// 计算
protocol_space::response resp;
switch (req._op)
{
case '+':
resp._result = req._x + req._y;
resp._code = 0;
break;
case '-':
resp._result = req._x - req._y;
resp._code = 0;
break;
case '*':
resp._result = req._x * req._y;
resp._code = 0;
break;
case '/':
{
if (req._y == 0)
{
resp._result = 0;
resp._code = -1;
}
else
{
resp._result = req._x / req._y;
resp._code=0;
}
}
break;
case '%':
{
if (req._y == 0)
{
resp._result = 0;
resp._code = -1;
}
else
{
resp._result = req._x % req._y;
resp._code=0;
}
}
break;
default:
{
resp._result=0;
resp._code=-1;
}
break;
}
return resp;
}
int main(int argc, char *argv[])
{
// 获取port
uint16_t port = atoi(argv[1]);
// 运行server
std::unique_ptr<EpollServer> es(new EpollServer(Calculate, port));
es->Init();
es->Dispatcher();
}
epoll_server.hpp
#pragma once
#include "sock.hpp"
#include "log.hpp"
#include "error.hpp"
#include "epoll.hpp"
#include "protocol.hpp"
#include <sys/time.h>
#include <unistd.h>
#include <fcntl.h>
#include <string>
#include <cerrno>
#include <cstring>
#include <unordered_map>
#include <functional>
// 默认端口号
static const uint16_t default_port = 8080;
// IO事件信息数组长度
static const int gnum = 4096;
// IO事件信息类型
typedef epoll_event type_t;
// 回调类型
class EpollServer;
class Connect;
using callback_t = std::function<void(Connect *)>;
using func_t = function<protocol_space::response(const protocol_space::request &req)>;
// 链接信息类
class Connect
{
public:
Connect(int fd, uint32_t events) : _fd(fd), _events(events)
{
}
void Register(callback_t recv, callback_t send, callback_t exception)
{
_recv = recv;
_send = send;
_exception = exception;
}
~Connect()
{
}
public:
// IO事件信息
int _fd;
uint32_t _events;
// 用户缓冲区
std::string _inbuffer;
std::string _outbuffer;
// 客户端信息
std::string _clientip;
uint16_t _clientport;
// IO处理函数
callback_t _recv;
callback_t _send;
callback_t _exception;
};
class EpollServer
{
public:
EpollServer(func_t func, uint16_t port = default_port) : _func(func), _port(port)
{
}
// 初始化
void Init()
{
// 创建套接字
_sock.Socket();
// 绑定ip和port
_sock.Bind(_port);
// 设置套接字监听状态
_sock.Listen();
// 创建epoll对象
_epoll.EpollCreate();
// listenfd添加到内核
AddLinkInformation(_sock.ListenFd(), EPOLLIN | EPOLLET);
}
// 事件派发器
void Dispatcher()
{
while (true)
{
int timeout = 1000;
LoopOnce(timeout);
}
}
// 循环一次
void LoopOnce(int timeout)
{
int readynumber = _epoll.EpollWait(_readyevents, gnum, timeout);
// 派发事件
for (int i = 0; i < readynumber; i++)
{
int fd = _readyevents[i].data.fd;
uint32_t event = _readyevents[i].events;
log_space::LogMessage(log_space::INFO, "当前正在处理%d上的%s", fd, (event & EPOLLIN) ? "EPOLLIN" : "OTHER");
// 文件错误或被挂起
if ((event & EPOLLERR) || (event & EPOLLHUP))
{
log_space::LogMessage(log_space::ERROR, "fd->%d exception,error code:%d,error info:%s", errno, strerror(errno));
}
// 读事件就绪
if ((event & EPOLLIN) && ConnIsExist(fd))
{
_connects[fd]->_recv(_connects[fd]);
}
// 写事件就绪
if ((event & EPOLLOUT) && ConnIsExist(fd))
{
_connects[fd]->_send(_connects[fd]);
}
}
}
// 监听就绪处理
void AcceptLink(Connect *conn)
{
// 获取全部链接
do
{
std::string clientip;
uint16_t clientport;
int err = 0;
// 获取链接
int sockfd = _sock.Accept(clientip, clientport, err);
if (sockfd > 0)
{
log_space::LogMessage(log_space::INFO, "链接%s::%d成功", clientip.c_str(), clientport);
// 添加I事件fd到内核
AddLinkInformation(sockfd, EPOLLIN | EPOLLET, clientip, clientport);
}
else
{
// 套接字被标记为非阻塞,并且不存在可接受的连接
if ((err & EAGAIN) || (err & EWOULDBLOCK))
break;
// 系统调用被在有效连接到达之前捕获的信号中断;
else if (err & EINTR)
continue;
// 出错
else
{
log_space::LogMessage(log_space::ERROR, "accept error,error code:%d,error info", err, strerror(err));
continue;
}
}
} while (conn->_events & EPOLLET);
}
// 协议处理
void ProtocolHandler(Connect *conn)
{
// 分割并处理报文
bool quit = false;
while (!quit)
{
// 提取完整报文
std::string package;
int len = protocol_space::GetPackage(conn->_inbuffer, package);
if (len > 0)
{
// 去除报头
protocol_space::RemoveHeader(package, len);
// 反序列化
protocol_space::request req;
req.deserializetion(package);
// 业务处理
protocol_space::response resp = _func(req);
// 序列化
std::string sendstr;
resp.serializetion(sendstr);
// 添加报头
protocol_space::AddHeader(sendstr);
// 待发送报文,添加到connect缓冲区
conn->_outbuffer += sendstr;
}
else
{
quit = true;
}
}
}
// 读数据
bool RecvHelper(Connect *conn)
{
bool ret = true;
do
{
// 读取所有报文
char buffer[1024];
int n = recv(conn->_fd, buffer, sizeof(buffer) - 1, 0);
// 读到了数据
if (n > 0)
{
buffer[n] = 0;
conn->_inbuffer += buffer;
}
// 数据读到了结尾
else if (n == 0)
{
log_space::LogMessage(log_space::ERROR, "client quit");
conn->_exception(conn);
ret = false;
break;
}
// 异常情况
else
{
// 接收缓冲区没有就绪
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
// 在系统调用结束之前被信号打断
else if (n == EINTR)
continue;
// 出错
else
{
log_space::LogMessage(log_space::ERROR, "recv error,error code:%d,error info:%s", errno, strerror(errno));
conn->_exception(conn);
ret = false;
break;
}
}
} while (conn->_events & EPOLLET);
return ret;
}
// 读就绪处理
void Recv(Connect *conn)
{
// 读数据
if (RecvHelper(conn) == false)
return;
// 协议处理
ProtocolHandler(conn);
// 立即发送
if (!conn->_outbuffer.empty())
conn->_send(conn);
}
// 写就绪处理
void Send(Connect *conn)
{
do
{
// 发送数据
ssize_t n = send(conn->_fd, conn->_outbuffer.c_str(), conn->_outbuffer.size(), 0);
// 发送成功
if (n > 0)
{
// 删除已经发送的数据
conn->_outbuffer.erase(0, n);
// 发送缓冲区已清空
if (conn->_outbuffer.empty())
break;
}
else
{
// 对端接收缓冲区未就绪
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
break;
}
// 被信号中断
else if (errno == EINTR)
continue;
// 出错
else
{
log_space::LogMessage(log_space::ERROR, "send error,error code:%d,error info:%s", errno, strerror(errno));
conn->_exception(conn);
return;
}
}
} while (conn->_events & EPOLLET);
// 没发完,加入等待队列
if (!conn->_outbuffer.empty())
{
EnableWrite(conn, true);
}
// 从等待队列去除
else if(conn->_events & EPOLLOUT)
{
EnableWrite(conn, false);
}
}
// 异常处理
void Exception(Connect *conn)
{
// 从内核移除IO事件
_epoll.EpollDel(conn->_fd, conn->_events);
// 关闭fd
close(conn->_fd);
// 从链接管理中移除链接信息
_connects.erase(conn->_fd);
// 析构链接信息节点对象空间
delete conn;
conn == nullptr;
log_space::LogMessage(log_space::INFO, "exception handler success");
}
// 增加链接信息
void AddLinkInformation(int fd, uint32_t event, const std::string &ip = "127.0.0.1", const uint16_t &port = default_port)
{
// 设置fd为非阻塞状态
// if(event & EPOLLET)
int fl = fcntl(fd, F_GETFL);
if (fl < 0)
{
log_space::LogMessage(log_space::ERROR, "fcntl error,error code:%d,error info:%s", errno, strerror(errno));
return;
}
int n = fcntl(fd, F_SETFL, fl | O_NONBLOCK);
if (n < 0)
{
log_space::LogMessage(log_space::ERROR, "fcntl error,error code:%d,error info:%s", errno, strerror(errno));
return;
}
// 构造链接信息类对象
Connect *conn = new Connect(fd, event);
conn->_clientip = ip;
conn->_clientport = port;
if (fd == _sock.ListenFd())
{
conn->_recv = std::bind(&EpollServer::AcceptLink, this, std::placeholders::_1);
conn->_send = nullptr;
conn->_exception = nullptr;
}
else
{
conn->_recv = std::bind(&EpollServer::Recv, this, std::placeholders::_1);
conn->_send = std::bind(&EpollServer::Send, this, std::placeholders::_1);
conn->_exception = std::bind(&EpollServer::Exception, this, std::placeholders::_1);
}
// 记录链接信息对象
_connects.insert(std::make_pair(fd, conn));
// 向内核添加信息
_epoll.EpollAdd(fd, event);
}
// 判断fd是否存在
bool ConnIsExist(int fd)
{
return _connects.find(fd) != _connects.end();
}
// 使能write
void EnableWrite(Connect *conn, bool write)
{
conn->_events = EPOLLIN | (write ? EPOLLOUT : 0) | EPOLLET;
_epoll.EpollMod(conn->_fd, conn->_events);
if (conn->_events & EPOLLOUT)
{
log_space::LogMessage(log_space::INFO, "write enable ");
}
else
{
log_space::LogMessage(log_space::INFO, "close write enable");
}
}
~EpollServer()
{
close(_sock.ListenFd());
_epoll.Close();
}
private:
// 端口号
uint16_t _port;
// 网络套接字类
sock_space::Sock _sock;
// 就绪链接信息缓冲区
type_t _readyevents[gnum];
// IO轮巡类
Epoll _epoll;
// 链接类
std::unordered_map<int, Connect *> _connects;
// 业务处理回调
func_t _func;
};
4.reactor服务器概念
reactor服务器:基于多路转接的包含dispatcher(事件派发器),Connection(链接管理器)半同步半异步的服务器.
链接管理器:
用Connect对象管理链接.Connect对象在链接或者IO事件到来时创建,Connect对象包含链接或者IO事件的文件描述符,事件类型,客户端IP和端口号,事件所对应的回调函数,将Connect对象中的fd作为key,Connect对象作为value映射到哈希表中管理.在有链接或者IO事件到来或者关闭的时候插入删除或者修改节点.
超时管理:
给每个Connect设置time_t lastime(最后访问时间),设置初始值为当前时间time(nullptr),并设置超时时间time_t timeout;在每次链接访问即将结束的时候更新访问时间为当前时间.事件派发一次结束后检测一次,如果lasttime+timeout>time(nullptr)则证明还没有超时,否则为超时,直接将时间按照异常处理即可.
事件派发器:
用来轮巡检测多个IO或者是链接事件,包括读/写/链接获取事件,并负责链接的超时管理.reactor用epoll结构管理多个IO事件或者链接,在初始化的时候创建epoll对象,在有链接或者IO事件到来或者关闭的时候插入删除或者修改节点.将就绪的节点用epoll_events结构体数组管理,统一派发,在析构的时候释放.
5.epoll工作模式
epoll工作模式有两种:level triggered(水平触发)模式,简称LT模式和edge triggered(边缘触发)模式,简称ET模式;
LT模式:,传输层接收缓冲区或者链接就绪队列中有数据未被取走,epoll会一直发送事件就绪通知,读取数据时可以不用关心数据一次拷贝不完,因为下一次epoll还会通知,这种模式下的IO效率是比较低的,epoll每次通知相当于一次调用返回,每次返回意味着一次系统调用,时间成本比较高.但是向应用层响应的效率比较高.适用于高响应的应用场景
ET模式:事件就绪时,epoll不会像LT模式一样一直通知,而是只通知一次,这就要求程序编写时,读数据要设置循环读取,这样会引发一个问题,如果底层没有数据就绪时,读系统调用会阻塞,服务器将会停止运行,影响其他模块正常运行.所以要给文件描述符设置非阻塞状态,虽然recv和send这样的系统调用有非阻塞和阻塞选项但是accept没有这样的选项,所以还是用fcntl文件描述符控制系统调用统一设置文件描述符.这样在底层没有数据的时候就不会阻塞在读系统调用上了,但是这种情况不能将返回值为-1的情况一概而论,当错误码为EAGAIN和EWOULDBLOCK时,意思是fd被设置非阻塞,但是内核接收缓冲区没有就绪数据,说明数据读完了,要跳出循环继续其他任务.
IO高效性
一次性将内核接收缓冲区数据接收,其本质是快速应用层在快速清空内核接收缓冲区,给对端提供更大的滑动窗口或者拥塞窗口,提高数据吞吐量.虽然LT模式下也可以设置循环非阻塞读取数据,但是这在编程的角度上来说不是强制的,可能因为编写问题,数据没有一次读完,仍然不影响程序向后运行.但是ET是强制性的,不一次读完,程序会出现bug.所以ET模式IO效率>=LT模式IO效率.
六、reactor服务器模式设计
1.多线程模式设计理念
主线程和线程池为大框架,每个线程都管理一个epoll_server服务器对象,在线程池中构建一个任务队列,用于管理connect对象,主线程充当生产者,初始化listen套接字,并负责监听listen套接字,将就绪的套接字添加到任务队列中;线程池各线程充当消费者,竞争任务队列中就绪的listen套接字,拿到listen套接字,用accept获取连接,各线程自己管理自己的sockfd.互不干扰.
2.master slaver模式设计理念
以主进程和一个进程池为大框架,主进程通过管道连接各个子进程,用管道文件读端控制子进程的运行,主进程负责将初始化listen套接字和监视listen套接字,子进程负责处理就绪的套接字获取链接,当主进程listen套接字就绪,就像任意管道发送任意数据,激活任意子进程,让子进程获取链接,处理链接.