文章目录
- 一、五种网络IO模型
- 1.数据传输过程
- 2.两组重要概念
- 3.五种网络IO模型
- (1)阻塞式IO
- (2)非阻塞式IO
- (3)IO多路复用
- (4)信号驱动IO
- (5)异步IO
- 4.五种网络IO模型的对比
- 5.举例说明
- 二、并发服务器模型
- 1.循环式迭代式模式
- 2.并发式服务器
- 3.prefork服务器
- 4.反应式服务器 (Reactor)
- 5.反应式 + 线程池型服务器
- 6.多反应式服务器
- 7.多反应式 + 线程池模型
- 三、Reactor模型
- 0.Reactor概述
- (1)概述
- (2)Reactor总结
- 1.ReactorV1版本
- (1)类的设计
- (2)类图设计
- (3)函数的设计
- (4)代码
- 2.ReactorV2版本
- (1)类图设计
- (2)伪代码
- (3)代码
- (4)TCP网络编程中的三个半事件
- (5)回调函数的注册、执行。将连接建立、消息到达、连接断开三个回调函数写到test中
- 3.ReactorV3版本
- (1)类图
- (2)代码
- 4.ReactorV4版本
- (1)流程梳理
- (2)类图设计
- 5.ReactorV5版本
- (1)类图
- (2)代码
- 6.进程线程通信方式 eventfd
- (1)作用
- (2)函数接口
- (3)eventfd支持的操作
- (4)进程之间通信
- (5)线程之间通信
- 7.定时器:timerfd的封装
- (1)作用
- (2)函数接口
- (3)支持的操作
- (4)线程间通信
一、五种网络IO模型
1.数据传输过程
2.两组重要概念
1.阻塞、非阻塞
(1)阻塞是指当一个操作(例如I/O操作)未完成时,程序或线程会一直等待,直到该操作完成后才能继续执行后续的任务。
(2)非阻塞是指当一个操作未完成时,程序或线程不会等待,而是立即返回,可以继续执行其他任务
阻塞/非阻塞关注的是用户态进程/线程的状态,其要访问的数据是否就绪,进程/线程是否需要等待。当前接口数据还未准备就绪时,线程是否被阻塞挂起。何为阻塞挂起?就是当前线程还处于CPU时间片当中,调用了阻塞的方法,由于数据未准备就绪,则时间片还未到就让出CPU。而非阻塞就是当前接口数据还未准备就绪时,线程不会被阻塞挂起,可以不断轮询请求接口,看看数据是否已经准备就绪。
2.同步、异步
(1)同步是指任务按顺序执行,一个任务的完成依赖于上一个任务的完成。即,当前任务必须等待前一个任务完成后才能继续执行。
(2)定义:异步是指任务可以在等待某个操作完成时继续执行其他任务。即,任务的启动不会阻塞后续任务的执行。
同步/异步关注的是消息通信机制。所谓同步,就是在发出一个调用时,自己需要参与等待结果的过
程,则为同步。同步需要主动读写数据,在读写数据的过程中还是会阻塞。异步IO,则指出发出调用以后到数据准备完成,自己都未参与,则为异步。异步只需要关注IO操作完成的通知,并不主动读写数据,由操作系统内核完成数据的读写。
3.五种网络IO模型
(1)阻塞式IO
①数据等待阶段,应用进程是阻塞态。
②数据从内核态拷贝到用户态,也是阻塞态。
(2)非阻塞式IO
①进程等待数据阶段时,非阻塞。会轮询检查数据是否准备好。
②数据从内核态拷贝到用户态,阻塞。
fcntl(fd); //可以将文件描述符从阻塞式设置为非阻塞式的。
(3)IO多路复用
①应用进程在数据准备阶段会调用IO多路复用的函数,阻塞。但可以监听多个文件描述符
②第二阶段,数据拷贝,阻塞。但是可能有多个文件描述符就绪。
(4)信号驱动IO
①数据等待阶段:应用进程注册信号后,立即返回,执行其他任务。当内核准备好数据后,会用信号通知应用进程取数据
②将数据从内核空间拷贝到用户空间:进程阻塞等待
前四种都是同步IO,第二阶段都是阻塞的。
(5)异步IO
①准备数据:注册信号后立即返回,非阻塞。
②数据从内核态拷贝到用户态:拷贝完成才用信号通知进程,非阻塞。
4.五种网络IO模型的对比
5.举例说明
二、并发服务器模型
1.循环式迭代式模式
一次只能服务一个。只能短连接,不能长连接。业务逻辑代码要短。
2.并发式服务器
创建一个子进程
3.prefork服务器
创建多个子进程
4.反应式服务器 (Reactor)
对多个客户端进行监听,背后是IO多路复用。
串联式:client1的读、业务、写结束后才执行client2,然后client3
5.反应式 + 线程池型服务器
如果业务逻辑很长,则最好分离出来,交给线程池。这样多个业务逻辑可以同时进行(被多个线程同时执行)。
6.多反应式服务器
7.多反应式 + 线程池模型
三、Reactor模型
封装Reactor(Socket网络编程 + IO多路复用)
Reactor + 线程池 = 项目框架
0.Reactor概述
(1)概述
Reactor模式是一种高效的事件驱动编程模型,常用于处理高并发网络请求。其主要思想是用少量的线程来处理大量的客户端连接,通过一个中心的事件分发器(Reactor)来管理这些连接和事件。
基本的Reactor服务器原理图
从图例中可以看到,多个客户端可以同时向Reactor服务器发起请求,而Reactor服务器是可以同时处理这些请求的。其中,Reactor使用IO多路复用技术监听多个客户端,但是为了能与多个客户端进行连接,所以注册了一个连接器Acceptor对象到Reactor中,进行连接事件的处理,也就是执行accept()函数,然后将连接交给Reactor对象,Reactor对象就对该连接进行对应的处理,读连接的数据,处理连接的数据,然后将处理好之后的数据发送给给个客户端,一条连接处理完毕之后继续处理下一条连接。其实这个过程,就是之前在Linux阶段,使用socket网络编程与IO多路复用(也就是epoll技术)实现的逻辑。接下来我们按照面向对象的思想进行重构。
(2)Reactor总结
1.将read与write操作对应的线程,称为IO线程
2.将处理业务的线程,称为计算线程,因为需要用到CPU
3.如果读写操作比较频繁,也就是IO线程使用比较频繁的话,就可以使用最基础的Reactor。即IO密集型任务,适合适用基础的Reactor。【反应式服务器】
4.如果业务逻辑比较复杂,计算线程耗时长,那么就需要加入线程池,即Reactor + ThreadPool的模型,即CPU密集型。【反应式+线程池型服务器】
1.ReactorV1版本
V1版本:将Socket网络编程用到的接口进行类的封装。
(1)类的设计
1.Socket类:将所有与套接字相关的操作,封装成一个Socket类。包括:套接字的创建、套接字的获取、套接字的关闭。
2.InetAddress类:所有与地址相关的操作,都封装到一个InetAddress类中。包括:通过IP与端口号填充结构体struct sockaddr_in、可以通过该结构体获取IP、port
3.Acceptor类:连接器类,将所有服务器的基本操作封装到连接器类中。包括:地址复用、端口复用、bind、listen、accept
4.TcpConnection类:如果Acceptor类的对象调用accept函数有正确的返回结果,表明三次握手建立成功,创立出一条连接,可以通过该连接收发数据。接收数据可以封装为receive,发送数据可以封装为send
5.SocketIO类:将所有接收数据与发送数据的底层逻辑,封装成一个类。该类才是真正进行数据收发的类,封装了read/write、recv/send函数。
(2)类图设计
(3)函数的设计
1.readn():
read要分多次读,因为缓冲区大小不够,可能一次读不了那么多。【SocketIO.cpp】
2.recv()
3.errno == EINTR
中断,更高优先级的线程会抢走CPU控制权
4.bzero()
#include <strings.h>
void bzero(void *s, size_t n);
(1)参数
①s:指向要清零的内存区域的指针。
②n:要清零的字节数。
(2)功能:
bzero 将指针 s 所指向的内存区域的前 n 个字节设置为零。
5.数据成员在初始化列表中,可以用其他成员函数进行初始化。
TcpConnection的传入的fd,是connfd,是accept()返回的。 socket()返回的fd是listenfd
6.getsockname()、getpeername():获取本端ip和对端的ip
(4)代码
代码链接:https://github.com/WangEdward1027/Socket/tree/main/Reactor/ReactorV1
2.ReactorV2版本
V2版本:
(1)封装epoll: 【V2】
加入epoll,并将epoll相关的三个函数epoll_create()、epoll_ctl()、epoll_wait()进行封装。新增了一个事件循环类EventLoop。
(2)每次修改业务逻辑,都要修改源代码,所以把三个半事件变为三个回调函数。【V2.1】
实现三个函数的回调:连接建立、收发数据、连接断开。先在EventLoop中进行注册,该类作为中转类,再给TcpConnection类进行注册回调和执行回调。
学习目标:①epoll的使用、②TCP通信过程的三个事件:连接建立、信息收发、连接断开、
(1)类图设计
V2:封装epoll
waitEpollFd():封装epoll_wait()
v2.1:实现三个回调函数的封装
(2)伪代码
void loop()
{
_isLooping = true;
while(_isLooping)
{
waitEpollFd();
}
}
void unloop()
{
_isLooping = false;
}
void waitEpollFd()
{
nready = epoll_wait(_epfd, _evtList, _evtList.size(), 3000);
if(-1 == nready && errno == EINTR)
{
continue;
}
else if(-1 == nready)
{
//打印错误
return;
}
else if(0 == nready)
{
//超时
}
else
{
for(idx = 0; idx < nready; ++idx)
{
int fd = _evtList[idx].data.fd;
if(文件描述符fd == listenfd)//有新的连接请求上来了
{
handleNewConnection();//处理新的连接请求
}
else
{
handleMessage(fd);//老的连接上进行数据的收发
}
}
}
}
void handleNewConnection()
{
//只要connfd是正常,就表明三次握手建立成功,就可以创建TcpConnection连接
int connfd = _acceptor.accept();
TcpConnection con = new TcpConnection(connfd);//创建连接
_conns.insert({connfd, con});//将文件描述符与连接的对象存放在map中
addEpollReadFd(connfd);//将文件描述符connfd放在红黑树上进行监听
}
void handleMessage(int fd)
{
it = _conns.find(fd);
if(it != _conns.end())//连接是存在
{
string msg = it->second->receive();//接收客户端的数据
it->second->send(msg);//将接收的数据原封不动的发给客户端
}
else
{
//连接不存在
}
}
(3)代码
代码链接:https://github.com/WangEdward1027/Socket/tree/main/Reactor/ReactorV2
(4)TCP网络编程中的三个半事件
一、连接建立
包括服务器端被动接受连接(accept)和客户端主动发起连接(connect)。TCP连接一旦
建立,客户端和服务端就是平等的,可以各自收发数据。
二、连接断开
包括主动断开(close、shutdown)和被动断开(read()返回0)。
1.shutdown()
#include <sys/socket.h>
int shutdown(int sockfd, int how);
how:
①SHUT_RD:关闭读端
②SHUT_WR:关闭写端
③SHUT_RDWR:关闭读写端。相当于close(fd)。
三、消息到达
文件描述符可读。这是最为重要的一个事件,对它的处理方式决定了网络编程的风格(阻塞
还是非阻塞,如何处理分包,应用层的缓冲如何设计等等)。
四、消息发送完毕
这算半个。对于低流量的服务,可不必关心这个事件;另外,这里的“发送完毕”是指数
据写入操作系统缓冲区(内核缓冲区),将由TCP协议栈负责数据的发送与重传,不代表对方已经接收
到数据。
(5)回调函数的注册、执行。将连接建立、消息到达、连接断开三个回调函数写到test中
现在要把NewConnection、handleMessage、close想放到testTcpConnection中。这样要修改逻辑就不需要动源代码了,直接在test中修改。利用的是回调函数。
理解回调的过程
问题:为什么注册回调要用std::move()传参?
答案:能用右值引用,尽量用右值引用。C++11的移动构造能避免拷贝构造的一次深拷贝,执行的是浅拷贝,效率高。
TcpConnection:
用shared_from_this()
的主要目的是在类的成员函数中创建指向自身的 shared_ptr。
3.ReactorV3版本
将第二个版本再进行封装,新增TcpServer类。
第三个版本就是在第二个版本的基础上,将Acceptor与EventLoop做了进一步封装而已,与第二个版本没有本质上的区别。
(1)类图
多加了一层封装:TcpServer类
(2)代码
新增了一个类TcpServer,加入TcpServer.h、TcpServer.cpp
代码链接:https://github.com/WangEdward1027/Socket/tree/main/Reactor/ReactorV3
4.ReactorV4版本
(1)流程梳理
1.V4版本:
(1)加入了ThreadPool用于处理业务逻辑。
(2)引入了eventfd,用于实现ThreadPool和Reactor的通信:线程池的各个子线程处理完任务后,将msg发给Reactor服务器(EventLoop类),类似生产者消费者模型。
2.过程:
(1)客户端向服务器发送消息,服务器接收消息:
客户端向服务器发消息msg。Reactor服务器的EventLoop类中的epoll_wait()监测到connfd而返回,走else if的onMessage逻辑。调用receive()函数来接收msg。
(2)服务器将消息转发给线程池:
Reactor (EventLoop)中的线程池对象 _pool调用addTask(),将msg传入线程池,添加进任务队列 TaskQueue。
(3)线程池进行业务逻辑的处理,并将处理后的消息发回给服务器:
①因为EventLoop本身没有发数据的能力,因此调用sendInEventLoop()将TcpConnection的send函数和msg通过bind包装,发给EventLoop。
②通过addpendings将处理后的msg添加进Reactor(EventLoop)的vector里,并write() eventfd (由wakeup函数封装)。
(4)服务器的epoll_wait()监测到eventfd返回,遍历vector容器,将里面处理后的msg send()给客户端。
//修改EventLoop.cpp、TcpConnection.cpp、TestTcpServer.cpp
3.分析:
(1)将任务的处理添加到线程池中:
不能由线程池的每个线程进行send(),都会经过同一个网口,会阻塞。
2.线程池间通信:使用eventfd系统调用,
(3)将send()函数、TcpConnection对象、处理后的数据,通过bind打包发给EventLoop。使得EventLoop具备了发送数据给客户端的能力。
(2)类图设计
引入了eventfd,要修改 EventLoop类 和 TcpConnection类
线程池的类图,上文实现过,直接拿来用。(基于对象)
完整类图
5.ReactorV5版本
V5版本:解决ThreadPool无法共享的问题。V4的全局变量不太好,就再封装一层,将线程池与TcpServer对象进一步封装,使其在同一个作用域中。
把V4版本中TestTcpServer.cpp中,暴露的所有细节全部封装:如MyTask类的定义、MyTask::process()的定义、三个回调函数的定义、线程池的构造与启动、三个回调函数的注册、Tcp服务器的构造与启动。现在封装成了仅有EchoServer的构造与启动。
【封装:线程池、TcpServer、MyTask类及其成员函数process()、回调函数的定义与注册】
#include "EchoServer.h"
int main(int argc, char **argv)
{
EchoServer server(4, 10, "127.0.0.1", 8888);
server.start();
return 0;
}
(1)类图
(2)代码
代码链接:https://github.com/WangEdward1027/Socket/tree/main/Reactor/ReactorV5
ThreadPool _pool;
TcpServer _server;
EchoServer(size_t threadNum, size_t queSize, const string &ip, unsigned
short port);
~EchoServer();
//让服务器启动与停止
void start();
void stop();
//三个回调函数
void onNewConnection(const TcpConnectionPtr &con);
void onMessage(const TcpConnectionPtr &con);
void onClose(const TcpConnectionPtr &con);
6.进程线程通信方式 eventfd
eventfd的内核计数器值的具体意义:
1.事件通知:内核计数器的值表示未处理的事件的数量
当读取 eventfd 时,获取的是事件的总数,并且计数器被重置。这对于事件通知非常有用,因为它允许接收方知道有多少事件发生,并在读取后重置计数器以接收新的事件。
2.同步机制:
eventfd 可以用于线程之间的同步。例如,一个生产者线程可以向 eventfd 写入值,表示它已完成某个任务;一个消费者线程可以读取 eventfd,确认生产者已完成任务,并进行相应处理。
计数器的值提供了一种简单的方式来通知线程某些事件或状态的变化,而无需复杂的锁机制。
(1)作用
从Linux 2.6.27版本开始,新增了不少系统调用,其中包括eventfd,它的主要是用于进程或者线程间通信(如通知/等待机制的实现)
(2)函数接口
#include <sys/eventfd.h>
int eventfd(unsigned int initval, int flags);
//举例
int evtfd = eventfd(0, 0);
(1)参数
initval:初始化计数器值,该值保存在内核。
flags:如果是2.6.26或之前版本的内核,flags 必须设置为0。
flags支持以下标志位:
EFD_NONBLOCK 类似于使用O_NONBLOCK标志设置文件描述符。
EFD_CLOEXEC 类似open以O_CLOEXEC标志打开, O_CLOEXEC 应该表示执行exec()时,
之前通过open()打开的文件描述符会自动关闭。
(2)返回值
返回值:函数返回一个文件描述符,与打开的其他文件一样,可以进行读写操作。
(3)eventfd支持的操作
eventfd系统调用返回的是文件描述符,该文件描述符与以前学习的文件描述符一样,可以读、写、监
听。
read函数:如果计数器A的值不为0时,读取成功,获得到该值;如果A的值为0,非阻塞模式时,会直
接返回失败,并把error置为EINVAL;如果为阻塞模式,一直会阻塞到A为非0为止。
write函数:将缓冲区写入的8字节整形值加到内核计数器上,即会增加8字节的整数在计数器A上,如果其值达到0xfffffffffffffffe时,就会阻塞(在阻塞模式下),直到A的值被read。
select/poll/epoll:支持被io多路复用监听。当内核计数器的值发生变化时,就会触发事件。
总结:
①eventfd会返回文件描述符fd。
②该fd可以被read函数读。读出的结果是内核计数器的值,读完之后内核计数器的值会被置为0。如果内核计数器本来就是0的话,那么执行read函数的线程就会被阻塞。
③该fd也可以被write进行写操作,写的时候
④内核计数器的数值是未处理的事件数量
(4)进程之间通信
(5)线程之间通信
与进程之间通信模式类似,我们可以在线程之间进行通信,一个线程A读取eventfd返回的文件描述符,如果读取到的内核计数器的值为0,那么就会阻塞;而另外一个线程B向eventfd返回的文件描述符进行写数据,就会唤醒A线程,从而达到B唤醒A,达到B线程(写线程)唤醒(通知)A线程的目的,这样两个线程之间就可以达到通信的目的,就是这个原理。在进程之间通信之后,父进程在读内核计数器的值,这里可以让A线程阻塞等待执行某种任务,只要不被B唤醒,A就一直阻塞,只有被唤醒就可以执行任务。那么使用面向对象封装,可以进行类图设计如下:
(1)数据成员:
① int _evtfd:用于通信的文件描述符,也就是eventfd返回的文件描述符
②EventFdCallback _cb:被唤醒的线程需要执行的任务
③bool _isStarted:标识EventFd运行标志的标志位
(2)成员函数:
①start函数:该函数启动,并通过IO多路复用方式select/poll/epoll中的一种循环监视数据成员,用于通信的文件描述符_evtfd是不是就绪,如果就绪就可以让线程读该文件描述符并且执行被唤醒后
需要执行的事件,也即是EventFdCallback类型的任务。
②stop函数:停止运行。
③handleRead函数:里面封装了read函数,该函数读取eventfd返回的文件描述符。
④wakeup函数:里面封装了write函数,该函数向eventfd返回的文件描述符中写数据,也就是唤醒
阻塞的线程,从而达到通信的目的
(3)核心函数:
EventFd::EventFd(EventFdCallback &&cb)
: _evtfd(createEventFd())
, _cb(std::move(cb))//注册
, _isStarted(false)
{
}
EventFd::~EventFd()
{
close(_evtfd);
}
//运行与停止
void EventFd::start()
{
struct pollfd pfd;
pfd.fd = _evtfd;
pfd.events = POLLIN;
_isStarted = true;
while(_isStarted)
{
int nready = poll(&pfd, 1, 3000);
if(-1 == nready && errno == EINTR)
{
continue;
}
else if(-1 == nready)
{
cerr << "-1 == nready" << endl;
return;
}
else if(0 == nready)
{
cout << ">>poll timeout!!!" << endl;
}
else
{
if(pfd.revents & POLLIN)
{
handleRead();//阻塞等待被唤醒
if(_cb)
{
_cb();//通信之后需要执行的任务
}
}
}
}
}
void EventFd::stop()
{
_isStarted = false;
}
//创建用于通信的文件描述符
int EventFd::createEventFd()
{
int ret = eventfd(10, 0);
if(ret < 0)
{
perror("eventfd");
return ret;
}
return ret;
}
//A线程需要执行的read的操作
void EventFd::handleRead()
{
uint64_t one = 1;
ssize_t ret = read(_evtfd, &one, sizeof(uint64_t));
if(ret != sizeof(uint64_t))
{
perror("read");
return;
}
}
//用于唤醒线程的函数
void EventFd::wakeup()
{
uint64_t one = 1;
ssize_t ret = write(_evtfd, &one, sizeof(uint64_t));
if(ret != sizeof(uint64_t))
{
perror("write");
return;
}
}
7.定时器:timerfd的封装
(1)作用
通过文件描述符的可读事件进行超时通知。
timerfd是Linux提供的一个定时器接口。这个接口基于文件描述符,通过文件描述符的可读事件进行超时通知,所以能够被用于select/poll/epoll的应用场景。timerfd是linux内核2.6.25版本中加入的接口
(2)函数接口
1.创建定时器
#include <sys/timerfd.h>
int timerfd_create(int clockid, int flags);
(1)参数:
clockid
:可设置为
CLOCK_REALTIME
:相对时间,从1970.1.1到目前的时间。更改系统时间 会更改获取的值,它以系
统时间为标。
CLOCK_MONOTONIC
:绝对时间,获取的时间为系统重启到现在的时间,更改系统时间对齐没有影响。
flags
: 可设置为
TFD_NONBLOCK(非阻塞);
TFD_CLOEXEC(同O_CLOEXEC)linux内核2.6.26版本以上都指定为0
(2)返回值:该函数生成一个定时器对象,返回与之关联的文件描述符。
2.设置定时器
int timerfd_settime(int fd, int flags,
const struct itimerspec *new_value,
struct itimerspec *old_value);
struct timespec
{
time_t tv_sec; //精确到秒数
long tv_nsec; //精确到纳秒数
};
struct itimerspec
{
struct timespec it_interval; //定时器周期时间,前后两次超时时间差,【多久闹一次】
struct timespec it_value; //定时器起始时间,绝对时间或相对时间【什么时候开始闹】
};
(1)参数:
fd: timerfd对应的文件描述符
flags: 0表示是相对定时器;TFD_TIMER_ABSTIME表示是绝对定时器
new_value:设置超时时间,如果为0则表示停止定时器。
old_value:一般设为NULL, 不为NULL,则返回定时器这次设置之前的超时时间。
(2)返回值:该函数能够启动和停止定时器。成功返回0,失败返回-1.
(3)读定时器
ssize_t read(int fd, void *buf, size_t count);
(3)支持的操作
定时器对象(也就是定时器创建出来的文件描述符),是可以被读以及监听的。
read函数:读取缓冲区中的数据,其占据的存储空间为sizeof(uint64_t),表示超时次数。
select/poll/epoll:当定时器超时时,会触发定时器相对应的文件描述符上的读操作,IO多路复用操作
会返回,然后再去对该读事件进行处理。
其实就是定时器对象在设置好定时操作后,当设置的超时时间到达后,定时器对象就就绪,就可以被读取了,然后当设置的超时时间到达后,定时器对象就就绪,就可以又被读取了,以此往复。
(4)线程间通信
代码链接:https://github.com/WangEdward1027/Socket/tree/main/Reactor/timerfd
设置好定时器对象(就像设置的闹钟一样),当定时器超时后,会发出超时通知,如果线程之前在循环监视对应的文件描述符,那么文件描述符就会就绪(可读),就可以执行read函数,接下来就可以执行预先设置好的任务。当定时器后续继续超时后,监听的文件描述符会继续就绪,文件描述符继续可读,就可以继续执行任务了,所以可见,每间隔指定的时间,线程都会因为超时而被唤醒,也就达到通知的目的。那么使用面向对象封装,可以进行类图设计如下: