【计算机网络】epoll

news2024/11/20 0:44:02

IO多路转接 - epoll

  • 一、I/O多路转接之 epoll
    • 1. epoll 接口
      • (1)epoll_create()
      • (2)epoll_wait()
      • (3)epoll_ctl()
    • 2. epoll 原理
    • 3. epoll 的优点
    • 4. epoll 的使用
    • 5. epoll 的工作模式
      • (1)水平触发 Level Triggered 工作模式(LT 模式)
      • (2)边缘触发 Edge Triggered 工作模式(ET 模式)
      • (3)LT 和 ET 的对比
  • 二、Reactor
    • 1. 概念
    • 2. 实现
      • (1)Epoller.hpp
      • (2)TcpServer.hpp
      • (3)Calculator.hpp
      • (4)main.cpp
      • (5)CMakeLists.txt
    • 3. 总结

一、I/O多路转接之 epoll

1. epoll 接口

(1)epoll_create()

首先 epoll_create() 这个接口就是帮我们创建一个 epoll 模型,这个模型是什么我们后面介绍原理的时候再讲。

其中 epoll_create() 的接口如下:

在这里插入图片描述

其中 epoll_create1() 是新标准,我们不介绍。而 epoll_create() 的参数 size 也已经废弃了,这个参数传什么也无所谓了,只要大于 0 就可以了。它的返回值也是一个文件描述符,成功则返回一个新的文件描述符,失败则返回 -1.

(2)epoll_wait()

epoll 模型创建好之后,我们想往这个 epoll 模型中新增一个要关心的 fd 及其事件;修改一个或者删除一个文件描述符及其事件;就需要用到 epoll_wait() 这个接口。该接口如下:

在这里插入图片描述

epoll_wait() 本质就是获取已经就绪的文件描述符。第一个参数 epfd 就是 epoll_create() 的返回值;第二个和第三个参数就是我们将来定义的一个用户级缓冲区,返回已经就绪的 fd 和 事件;最后一个参数的含义和 polltimeout 一模一样,单位为毫秒。而返回值表示已经就绪的文件描述符的个数。

其中我们看到第二个参数中带有 struct epoll_event 这个类型的结构体,这个结构体是什么呢?我们来看一下:

在这里插入图片描述

如上图,epoll_event 中的 events 表示哪些事件,它的类型是 uint32_t,也就是一个位图,和 poll 中的 events 一样,以位图的形式传递标记位事件;而第二个字段 data 的类型 epoll_data_t 是一个联合体,就是可以选择该联合体字段中的任意一个,通常用来保存的是用户级的数据,有关这个字段我们后面再说。

其中 events 可以是以下几个宏的集合:

  • EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
  • EPOLLOUT : 表示对应的文件描述符可以写;
  • EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
  • EPOLLERR : 表示对应的文件描述符发生错误;
  • EPOLLHUP : 表示对应的文件描述符被挂断;
  • EPOLLET : 将 EPOLL 设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的;
  • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个 socket 的话,需要再次把这个 socket 加入到 EPOLL 队列里。

(3)epoll_ctl()

为了使用这个 epoll,首先我们也需要将 listen 套接字添加到 epoll 模型里,所以就需要用到 epoll_ctl() 接口。该接口如下:

在这里插入图片描述

它的作用主要是我们想向系统里新增一个文件描述符,及其要关心的事件,想要修改一个特定的文件描述符关心的事件。所以 epoll_ctl() 支持 epoll 来进行相关的管理工作。

其中第一个参数就是 epoll_create() 的返回值。第二个参数 op 就是以下三个选项的其中一个,分别代表增加,修改,删除:

在这里插入图片描述

第三个参数和第四个参数代表哪一个文件描述符上的哪些事件。

所以我们需要使用 epoll 的话,需要使用以上三个系统调用,而 selectpoll 都只有一个系统调用。

2. epoll 原理

无论是 selectpoll 都是用数组来管理文件描述符和对应的事件,更重要的是该数组这个数据结构是由用户来维护的!接下来我们解释一下 epoll 的原理,就能明白为什么需要三个系统调用来使用 epoll 了。

操作系统在硬件层面,通过硬件中断的方式知道网卡上有数据了,然后通过网卡驱动上的方法将数据拷贝到网卡驱动上的数据链路层。同时,操作系统为了支持 epoll,它为我们提供了三种机制:

  • 操作系统在内部会帮我们维护一颗红黑树

其中红黑树中的节点包含的最重要的字段是:int fduint32_t event,分别代表内核要关心的文件描述符和要关心的事件。

  • 操作系统会为我们维护一个就绪队列

一旦红黑树中有特定的一个节点,比如某个节点上的文件描述符的某个事件就绪了,就可以把该节点添加到就绪队列中;其中该就绪队列中每个节点中的字段包含 int fduint32_t event,分别代表已经就绪的文件描述符和已经就绪的事件。

  • 操作系统的底层网卡驱动是允许操作系统去注册一些回调机制的

操作系统内部会提供一个回调函数,这个回调函数是用来干什么的呢?首先网卡通过硬件中断的方式将数据搬到了网卡驱动层。当网卡驱动层中的数据链路层有数据就绪了,主动会调用该回调函数。然后该回调方法会做如下几个操作:

  1. 向上交付
  2. 交付给 TCP 的接收队列
  3. 根据文件描述符为键值查找红黑树,确认这个接受队列和哪一个文件描述符是关联的,再判断该 fd 是否关心了 EPOLLIN 或者 EPOLLOUT 读写事件,如果有,由于数据已经就绪,所以接下来第四步
  4. 构建就绪节点,插入到就绪队列中

实际上我们用 epoll 的时候,操作系统就会把该回调函数注册到底层,然后底层数据一旦就绪就会自动回调执行上面的四个方法。所以对于用户来说,只需要在就绪队列中获取就绪节点即可!整套机制都是由操作系统完成的!

我们把这三套机制叫做 epoll 模型,如下图:

请添加图片描述

其中 eventpollepoll 对象;epitem 为红黑树的节点。

所以接口 epoll_create() 创建 epoll 模型本质就是创建红黑树,创建就绪队列以及注册底层的回调机制。所以该 epoll 模型怎么让进程找到呢?其实给 epoll 模型放入到 struct file 对象即可,把它也当作文件!因为在 Linux 中一切皆文件!struct file 中也有指针指向 epoll 模型!所以再把该 struct file 对象添加到进程的文件描述符表中即可!

所以 epoll_create() 实质上就是在操作系统中创建 struct file,其中的指针指向整个 epoll 对象,对应的文件描述符就能挂接到进程的文件描述符表中,最后把该文件描述符返回给用户,所以我们就可以通过该文件描述符找到 struct file 并找到 epoll 模型了。epoll_ctl() 实质上的增加、修改、删除都是在对红黑树进行操作。其中 epoll_wait() 的第二个参数是输出型参数,它会将就绪队列中所有就绪的节点一个一个地放进 struct epoll_event 里。

在这里插入图片描述

3. epoll 的优点

基于 epoll 的原理,我们可以得到 epoll 的优势:

  • 检测就绪的时间复杂度为 O(1),因为只需要看队列是否为空就可以了。而获取就绪的时间复杂度为 O(n),因为需要将就绪队列中的节点一个一个拷贝到应用层。
  • fd 和 event 没有上限,因为该红黑树有多大由操作系统说了算
  • 由于该红黑树是操作系统帮我们维护的,所以不需要在用户层由用户维护一个数组这样的数据结构,来管理所有的文件描述符及其要关心的事件了
  • epoll_wait() 的返回值 n,表示有 n 个 fd 就绪了,那么该接口还会将已经就绪的节点放入到它的输出型参数 events 中,所以就绪事件是连续的,有 n 个!这意味着,上层用户处理已经就绪的事件,就不再需要像以前一样检测有哪些 fd 是非法的,哪些是没有就绪的了;只需要根据返回值 n,遍历 events 即可!

4. epoll 的使用

我们对 epoll 的相关接口进行一下简单的封装成为 Epoller.hpp,如下:

				#pragma once
				#include "NoCopy.hpp"
				#include "log.hpp"
				
				#include <sys/epoll.h>
				 
				#include <cstring>
				#include <cerrno> 
				
				class Epoller : public NoCopy
				{
				    static const int size = 128;
				public:
				    Epoller()
				    {
				        _epfd = epoll_create(size);
				        if(_epfd == -1){
				            lg(Fatal, "epoll_create error: %s", strerror(errno));
				        }
				        else{
				            lg(Info, "epoll_create success: %d", _epfd);
				        }
				    }
				
				    int EpollerWait(struct epoll_event revents[], int num)
				    {
				        int n = epoll_wait(_epfd, revents, num, _timeout);
				        return n;
				    }
				
				    int EpollerCtl(int oper, int sockfd, uint32_t event)
				    {   
				        int n = 0;
				        if(oper == EPOLL_CTL_DEL)
				        {
				            n = epoll_ctl(_epfd, oper, sockfd, nullptr);
				            if(n != 0){
				                lg(Error, "epoll_ctl delete error!");
				            }
				        }
				        else
				        {
				            // EPOLL_CTL_MOD 或者 EPOLL_CTL_ADD
				            // 设置进内核的红黑树中
				            struct epoll_event ev;
				            ev.events = event;
				            ev.data.fd = sockfd;    // 方便我们后面处理的时候知道是哪一个 fd 就绪了
				
				            n = epoll_ctl(_epfd, oper, sockfd, &ev);
				            if(n != 0){
				                lg(Error, "epoll_ctl error!");
				            }
				        }
				        return n;
				    }
				
				    ~Epoller()
				    {
				        if(_epfd >= 0){
				            close(_epfd);
				        }
				    }
				private:
				    int _epfd;
				    int _timeout = 1000;
				};

接下来编写 epollSever.hpp,如下:

				#pragma once
				
				#include <iostream>
				#include <memory>
				#include <sys/epoll.h>
				 
				#include "Socket.hpp"
				#include "log.hpp"
				#include "Epoller.hpp"
				#include "NoCopy.hpp"
				
				uint32_t EVENT_IN = (EPOLLIN);
				uint32_t EVENT_OUT = (EPOLLOUT);
				
				class EpollServer : public NoCopy
				{
				    static const int maxevents = 64;
				public:
				    EpollServer(uint16_t port)
				        :_port(port)
				        ,_listenSocket_ptr(new Sock())
				        ,_epoller_ptr(new Epoller())
				    {}
				
				    void Init()
				    {
				        _listenSocket_ptr->Socket();
				        _listenSocket_ptr->Bind(_port);
				        _listenSocket_ptr->Listen();
				
				        lg(Info, "create listen socket success: %d\n", _listenSocket_ptr->GetFd());
				    }
				
				    void Accepter()
				    {
				        std::string client_ip;
				        uint16_t client_port;
				        int sockfd = _listenSocket_ptr->Accept(&client_ip, &client_port);
				        if(sockfd > 0){
				            // 不能直接读取,而是将它添加到内核的红黑树中,让 epoll 关心即可
				            _epoller_ptr->EpollerCtl(EPOLL_CTL_ADD, sockfd, EVENT_IN);
				            lg(Info, "get a new link, client info@ %s:%d", client_ip.c_str(), client_port);
				        }
				        else{
				            return;
				        }
				    }
				
				    // for test
				    void Recver(int fd)
				    {
				        char buffer[1024];
				        ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
				        if (n > 0)
				        {
				            buffer[n] = 0;
				            std::cout << "get a message: " << buffer << std::endl;
				
				            // write
				            std::string echo_str = "sever echo $ ";
				            echo_str += buffer;
				            write(fd, echo_str.c_str(), echo_str.size());
				        }
				        else if (n == 0)
				        {
				            lg(Info, "client quit, me too, close fd is: %d", fd);
				            // 先在内核红黑树中移除 fd,再关闭 fd
				            _epoller_ptr->EpollerCtl(EPOLL_CTL_DEL, fd, 0);
				            close(fd);
				        }
				        else
				        {
				            lg(Warning, "recv error, fd is: %d", fd);
				            _epoller_ptr->EpollerCtl(EPOLL_CTL_DEL, fd, 0);
				            close(fd);
				        }
				    }
				
				    void Dispatcher(struct epoll_event revs[], int num)
				    {
				        for(int i = 0; i < num; ++i){
				            uint32_t event = revs[i].events;
				            int fd = revs[i].data.fd;
				
				            if(event & EVENT_IN){
				                // 读事件就绪
				                if(fd == _listenSocket_ptr->GetFd()){
				                    // 获取到一个新连接,连接管理器
				                    Accepter();
				                }
				                else{
				                    // 其它 fd 上的普通读取事件就绪
				                    Recver(fd);
				                }
				            }
				            else if(event & EVENT_OUT){
				                // 写事件就绪
				                // ...
				            }
				            else{
				                // ...
				            }
				        }
				    }
				
				    void Start()
				    {
				        // 将 listenSocket 添加到 epoll 中
				        // 也就是将 listenSocket 和它所关心的事件添加到内核 epoll 模型中的红黑树中!
				        _epoller_ptr->EpollerCtl(EPOLL_CTL_ADD, _listenSocket_ptr->GetFd(), EVENT_IN);
				        
				        struct epoll_event revs[maxevents];
				        while(true)
				        {
				            // 其中 n 最大是 maxevents
				            int n = _epoller_ptr->EpollerWait(revs, maxevents);
				            if(n > 0)
				            {
				                // 有事件就绪,分派事件
				                lg(Debug, "event happend, fd is: %d", revs[0].data.fd);
				                Dispatcher(revs, n); 
				            }
				            else if(n == 0)
				            {
				                lg(Info, "time out...");
				            }
				            else 
				            {
				                lg(Error, "epoll wait error");
				            }
				        }
				    }
				
				    ~EpollServer()
				    {
				        _listenSocket_ptr->Close(); 
				    }
				
				private:
				    std::shared_ptr<Sock> _listenSocket_ptr;
				    std::shared_ptr<Epoller> _epoller_ptr;
				    uint16_t _port;
				};

我们上面两个模块都用到了 NoCopy 这个类,也就是禁止拷贝,代码如下:

				#pragma once
				
				class NoCopy
				{
				public:
				    NoCopy(){}
				    NoCopy(const NoCopy&) = delete;
				    const NoCopy& operator=(const NoCopy&) = delete;
				};

5. epoll 的工作模式

(1)水平触发 Level Triggered 工作模式(LT 模式)

epoll 默认所处的工作模式就是 LT 模式。例如我们上面所写的简单的 epoll 服务器,每次有新的连接到来时,如果我们不处理它,epoll 会每次都通知我们有连接到来了。这种一旦有新的连接到来,或者有新的数据到来,上层如果不取走,底层就会一直通知上层,让上层把数据尽快取走,这种模式就叫做 LT 模式。就像示波器中的高电平,一直有效。

(2)边缘触发 Edge Triggered 工作模式(ET 模式)

ET 模式指的是,数据或者连接,从无到有,从有到多,变化的时候,才会通知我们一次。正是因为 ET 模式有这种特点,才会倒逼程序员每次通知都必须把本轮数据全部取走,怎么保证数据全部取走呢?所以就需要循环读取,直到读取出错!但是我们使用 read() 或者 recv() 在缓冲区中读取数据的时候,当缓冲区的数据没有了,因为它们的读取方式默认是阻塞的,所以此时就会阻塞,服务器就会被挂起!所以我们在 ET 模式下,所有的 fd 必须是要设置为非阻塞的!

(3)LT 和 ET 的对比

  • selectpoll 其实也是工作在 LT 模式下;epoll 既可以支持 LT,也可以支持 ET

  • 普遍地我们认为,ET 的工作模式比 LT 的工作模式通知效率更高,因为通知一次就可以倒逼上层把全部数据读取走。同时也看得出来 ET 模式的 IO 效率也更高,这也就意味着,TCP 会向对方通告一个更大的窗口,从而从概率上让对方一次给自己发送更多的数据!

  • 所谓的 LT 模式和 ET 模式,本质就是向就绪队列中放入多个或者一个就绪的事件

  • 但是 ET 模式就一定比 LT 模式的效率高吗?不一定!因为 LT 也可以将所有的 fd 设置为非阻塞,然后循环读取,也就是当通知一次的时候,就把数据全部取走了,就和 ET 一样了!所以谁的效率高不一定,要看具体的实现。

二、Reactor

1. 概念

我们在上面编写的 epoll 服务器的代码中,在其他普通的 fd 读取事件就绪时,也就是在 Recver() 中,读取是有问题的,因为我们不能区分每次读取上来的数据是一个完整的报文。另外还有其它各种问题,所以我们要对上面的代码使用 Reactor 的设计模式作修改。

所谓的 Reactor 是一种设计模式,翻译过来称为反应堆模式。用于处理事件驱动的系统中的并发操作。它提供了一种结构化的方式来处理输入事件,并将其分发给相应的处理程序。Reactor 模式通常用于网络编程中,特别是在服务器端应用程序中。

要进行正确的 IO 处理,就应该有如下的理解:在应用层一定存在大量的连接,每一个连接在应用层都叫做文件描述符。而在读取每一个文件描述符上的数据的时候,可能根本就没有读取完,此时我们就需要把该文件描述符上的数据临时保存起来。所以我们在写服务器的时候,我们要保证每一个文件描述符及其连接及其缓冲区,都是独立的!

2. 实现

(1)Epoller.hpp

Epoller.hpp 是对 epoll 的系统调用的封装,代码如下:

			#pragma once
			#include "NoCopy.hpp"
			#include "log.hpp"
			
			#include <sys/epoll.h>
			   
			#include <cstring>
			#include <cerrno> 
			
			class Epoller : public NoCopy
			{
			    static const int size = 128;
			public:
			    Epoller()
			    {
			        _epfd = epoll_create(size);
			        if(_epfd == -1){
			            lg(Fatal, "epoll_create error: %s", strerror(errno));
			        }
			        else{
			            lg(Info, "epoll_create success: %d", _epfd);
			        }
			    }
			
			    int EpollerWait(struct epoll_event revents[], int num, int timeout)
			    {
			        int n = epoll_wait(_epfd, revents, num, timeout);
			        return n;
			    }
			
			    int EpollerCtl(int oper, int sockfd, uint32_t event)
			    {   
			        int n = 0;
			        if(oper == EPOLL_CTL_DEL)
			        {
			            n = epoll_ctl(_epfd, oper, sockfd, nullptr);
			            if(n != 0){
			                lg(Error, "epoll_ctl delete error! sockfd: %d", sockfd);
			            }
			        }
			        else
			        {
			            // EPOLL_CTL_MOD 或者 EPOLL_CTL_ADD
			            // 设置进内核的红黑树中
			            struct epoll_event ev;
			            ev.events = event;
			            ev.data.fd = sockfd;    // 方便我们后面处理的时候知道是哪一个 fd 就绪了
			
			            n = epoll_ctl(_epfd, oper, sockfd, &ev);
			            if(n != 0){
			                lg(Error, "epoll_ctl error!");
			            }
			        }
			        return n;
			    }
			
			    ~Epoller()
			    {
			        if(_epfd >= 0){
			            close(_epfd);
			        }
			    }
			private:
			    int _epfd;
			    int _timeout = 1000;
			};

(2)TcpServer.hpp

TcpServer.hpp 是处理 IO 的服务器,代码如下:

			#pragma once
			#include <iostream>
			#include <string>
			#include <unordered_map>
			#include <memory>
			#include <functional>
			
			#include <cerrno>  
			
			#include "log.hpp"
			#include "NoCopy.hpp"
			#include "Epoller.hpp"
			#include "Socket.hpp"
			#include "Comm.hpp"
			
			// 设置 ET 模式
			uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
			uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
			const static int g_buffer_size = 128;
			
			class Connection;
			class TcpServer;
			
			using func_t = std::function<void(std::weak_ptr<Connection>)>;
			using except_t = std::function<void(std::weak_ptr<Connection>)>;
			
			// 管理每一个连接
			class Connection 
			{
			public:
			    Connection(int sockfd)
			        :_sockfd(sockfd)
			    {}
			
			    void SetHandler(func_t recv_cb, func_t send_cb, except_t except_cb)
			    {
			        _recv_cb = recv_cb;
			        _send_cb = send_cb;
			        _except_cb = except_cb;
			    }
			
			    ~Connection()
			    {}
			private:
			    int _sockfd;
			
			    // 充当缓冲区
			    std::string _inbuffer; 
			    std::string _outbuffer;
			public:
			    // 回指指针
			    // std::shared_ptr<TcpServer> _tcpServer_ptr;
			    std::weak_ptr<TcpServer> _tcpServer_ptr;
			
			    // 回调方法
			    func_t _recv_cb;
			    func_t _send_cb;
			    except_t _except_cb;
			
			    std::string _ip;
			    uint16_t _port;
			
			    int Sockfd()
			    {
			        return _sockfd;
			    }
			
			    void AppendInBuffer(const std::string& info)
			    {
			        _inbuffer += info;
			    }
			
			    void AppendOutBuffer(const std::string& info)
			    {
			        _outbuffer += info;
			    }
			
			    std::string& Inbuffer()
			    {
			        return _inbuffer;
			    }
			
			    std::string& Outbuffer()
			    {
			        return _outbuffer;
			    }
			
			    void SetWeakPtr(std::weak_ptr<TcpServer> tcpServer_ptr)
			    {
			        _tcpServer_ptr = tcpServer_ptr;
			    }
			};
			
			
			// enable_shared_from_this 可以提供返回当前对象的 this 对应的 shared_ptr
			class TcpServer : public std::enable_shared_from_this<TcpServer>, public NoCopy
			{
			    static const int num = 64;
			public:
			    TcpServer(uint16_t port, func_t OnMessage)
			        :_port(port)
			        ,_quit(true)
			        ,_OnMessage(OnMessage)
			        ,_epoller_ptr(new Epoller())
			        ,_listenSock_ptr(new Sock())
			    {}
			
			    void Init()
			    {
			        _listenSock_ptr->Socket();
			        // 将 fd 设置为非阻塞
			        SetNonBlockOrDie(_listenSock_ptr->GetFd());
			        _listenSock_ptr->Bind(_port);
			        _listenSock_ptr->Listen();
			        lg(Info, "create listen socket success: %d\n", _listenSock_ptr->GetFd());
			
			        AddConnection(_listenSock_ptr->GetFd(), EVENT_IN, \
			            std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
			    }
			
			    void AddConnection(int sockfd, uint32_t event, func_t recv_cb, func_t send_cb, except_t except_cb,\
			        const std::string& ip = "0.0.0.0", uint16_t port = 0)
			    {
			        // 1. 给所有的套接字建立一个 connection 对象
			        std::shared_ptr<Connection> new_connection(new Connection(sockfd));
			
			        // shared_from_this(): 返回当前对象的 shared_ptr
			        new_connection->SetWeakPtr(shared_from_this()); 
			        
			        new_connection->SetHandler(recv_cb, send_cb, except_cb);
			        new_connection->_ip = ip;
			        new_connection->_port = port;
			
			        // 2. 将套接字和 Connection 添加到 unordered_map 中
			        _connections.insert(std::make_pair(sockfd, new_connection));
			
			        // 3. 将 listen 套接字或其它事件添加到 epoll 模型中
			        _epoller_ptr->EpollerCtl(EPOLL_CTL_ADD, sockfd, event);
			
			        lg(Debug, "add a new connection success, sockfd is: %d", sockfd);
			    }
			
			    // listen 套接字的连接管理器,即有事件就绪的时候,就是有连接到来,就需要处理新连接
			    void Accepter(std::weak_ptr<Connection> conn)
			    {
			        auto connection = conn.lock();
			        // 不断检测是否还有新连接,直到读取出错
			        while(true){
			            struct sockaddr_in peer;
			            socklen_t len = sizeof(peer);
			            int sockfd = ::accept(connection->Sockfd(), (sockaddr*)&peer, &len);
			
			            // 获取到新连接设置为非阻塞,然后构建 Connection 对象放入哈希表中和内核红黑树中
			            if(sockfd > 0){
			                uint16_t peer_port = ntohs(peer.sin_port);
			                char ipbuffer[128];
			                inet_ntop(AF_INET, &peer.sin_addr, ipbuffer, sizeof(ipbuffer));
			                lg(Debug, "get a new client, get info-> [%s: %d], sockfd: %d", ipbuffer, peer_port, sockfd);
			
			                SetNonBlockOrDie(sockfd);
			                AddConnection(sockfd, EVENT_IN, \
			                    std::bind(&TcpServer::Recver, this, std::placeholders::_1),\
			                    std::bind(&TcpServer::Sender, this, std::placeholders::_1),\
			                    std::bind(&TcpServer::Excepter, this, std::placeholders::_1),\
			                    ipbuffer, peer_port);
			            }
			            else{
			                if(errno == EWOULDBLOCK){   // 读取完毕
			                    break;
			                }
			                else if(errno == EINTR){ // 信号原因中断
			                    continue;
			                }
			                else{
			                    break;
			                }
			            }
			        }
			    }
			
			    // 普通事件的事件管理器
			    // 对于服务器而言只需要进行IO,不需要关心是否读完和报文的格式
			    void Recver(std::weak_ptr<Connection> conn)
			    {
			        if(conn.expired()) return;
			        auto connection = conn.lock();
			
			        int sockfd = connection->Sockfd();
			        while(true){
			            char buffer[g_buffer_size];
			            memset(buffer, 0, sizeof(buffer));
			            ssize_t n = recv(sockfd, buffer, sizeof(buffer) - 1, 0);   // 非阻塞读取
			            if(n > 0){
			                connection->AppendInBuffer(buffer);
			            }
			            else if(n == 0){
			                lg(Info, "sockfd: %d, client info %s: %d quit...", sockfd, connection->_ip.c_str(), connection->_port);
			                connection->_except_cb(connection);
			            }
			            else{
			                if(errno == EWOULDBLOCK){
			                    break;
			                }
			                else if(errno == EINTR){
			                    continue;
			                }
			                else{
			                    lg(Warning, "sockfd: %d, client info %s: %d recv error...", sockfd, connection->_ip.c_str(), connection->_port);
			                    connection->_except_cb(connection);
			                    return;
			                }
			            }
			        }
			        // 交给上层处理,读取到的数据都在 connection 中
			        // 1. 检测
			        // 2. 如果有完整报文,就处理
			        _OnMessage(connection);
			    }
			
			    void Sender(std::weak_ptr<Connection> conn)
			    {
			        if(conn.expired()) return;
			        auto connection = conn.lock();
			        
			        auto& outbuffer = connection->Outbuffer();
			        while(true){
			            ssize_t n = send(connection->Sockfd(), outbuffer.c_str(), outbuffer.size(), 0);
			            if(n > 0){
			                outbuffer.erase(0, n);
			                if(outbuffer.empty()){
			                    break;
			                }
			            }
			            else if(n == 0){
			                return;
			            }
			            else{
			                if(errno == EWOULDBLOCK){
			                    break;
			                }
			                else if(errno == EINTR){
			                    continue;
			                }
			                else{
			                    lg(Warning, "sockfd: %d, client info %s: %d send error...", connection->Sockfd(), connection->_ip.c_str(), connection->_port);
			                    connection->_except_cb(connection);
			                    return;
			                }
			            }
			        }
			        // 没发完
			        if(!outbuffer.empty()){
			            // 开始对写事件关心
			            EnableEvent(connection->Sockfd(), true, true);
			        }
			        else{
			            // 关闭对写事件关心
			            EnableEvent(connection->Sockfd(), true, false);
			        }
			    }
			
			    void Excepter(std::weak_ptr<Connection> connection)
			    {
			        if(connection.expired()) return;
			        auto conn = connection.lock();
			
			        lg(Debug, "Excepter hander sockfd: %d, client info %s: %d excepter handler", \
			            conn->Sockfd(), conn->_ip.c_str(), conn->_port);
			
			        // 1. 移除对特定 fd 的关心
			        // EnableEvent(connection->Sockfd(), false, false);
			        _epoller_ptr->EpollerCtl(EPOLL_CTL_DEL, conn->Sockfd(), 0);
			        // 2. 关闭异常的 fd
			        lg(Debug, "close %d done...\n", conn->Sockfd());
			        close(conn->Sockfd());
			        // 3. 从 _connections 中移除 fd 和 Connection 的映射关系
			        lg(Debug, "remove %d from _connections...\n", conn->Sockfd());
			        _connections.erase(conn->Sockfd());
			    }
			
			    void EnableEvent(int sockfd, bool readAble, bool writeAble)
			    {
			        uint32_t events = 0;
			        events |= ((readAble ? EPOLLIN : 0) | (writeAble ? EPOLLOUT : 0) | EPOLLET);
			        _epoller_ptr->EpollerCtl(EPOLL_CTL_MOD, sockfd, events);
			    }
			
			    bool IsConnectionExist(int fd)
			    {
			        auto iter = _connections.find(fd);
			        if(iter == _connections.end()){
			            return false;
			        }
			        else{
			            return true;
			        }
			    }
			
			    void Dispatcher(int timeout)
			    {
			        int n = _epoller_ptr->EpollerWait(revs, num, timeout);
			        for(int i = 0; i < n; i++){
			            uint32_t event = revs[i].events;
			            int sockfd = revs[i].data.fd;
			
			            // 一旦事件异常,统一把异常转换为读写问题
			            if(event & EPOLLERR){
			                event |= (EPOLLIN | EPOLLOUT);
			            }
			            if(event & EPOLLHUP){
			                event |= (EPOLLIN | EPOLLOUT);
			            }
			            
			            // 只需要处理读写
			            if((event & EPOLLIN) && IsConnectionExist(sockfd)){
			                if(_connections[sockfd]->_recv_cb){
			                    _connections[sockfd]->_recv_cb(_connections[sockfd]);
			                }
			            }
			            if((event & EPOLLOUT) && IsConnectionExist(sockfd)){
			                if(_connections[sockfd]->_send_cb){
			                    _connections[sockfd]->_send_cb(_connections[sockfd]);
			                }
			            }
			        }
			    }
			
			    void Loop()
			    {
			        _quit = false;
			        
			        // AddConnection();
			        while(!_quit)
			        {
			            // 事件派发
			            // Dispatcher(3000);
			            Dispatcher(-1);
			            PrintConnection();
			        }
			        _quit = true;
			    }
			
			    void PrintConnection()
			    {
			        std::cout << "_connections fd list: ";
			        for(auto& connection : _connections){
			            std::cout << connection.second->Sockfd() << ", ";
			            std::cout << "inbuffer: " << connection.second->Inbuffer() << " ";
			        }
			        std::cout << std::endl;
			    }
			
			    ~TcpServer()
			    {}
			private:
			    std::shared_ptr<Epoller> _epoller_ptr;
			    std::shared_ptr<Sock> _listenSock_ptr;   
			    uint16_t _port;
			    bool _quit;
			
			    struct epoll_event revs[num];
			
			    // fd 到对应连接到映射,_connections 就是当前服务器管理的所有连接
			    std::unordered_map<int, std::shared_ptr<Connection>> _connections;
			
			    // 让上层处理信息
			    func_t _OnMessage;
			};

(3)Calculator.hpp

Calculator.hpp 是上层处理业务的具体处理方法,代码如下:

			#pragma once
			#include <string>
			#include <iostream>
			#include "Protocol.hpp"
			
			enum
			{
			    DIV_ERR = 1,
			    MOD_ERR = 2,
			    OP_ERR = 3
			};
			
			
			// 上层业务
			class Calculator
			{
			public:
			    Calculator()
			    {}
			
			    Response CalculatorHelper(const Request &req)
			    {
			        Response resp(0, 0);
			        switch (req._op)
			        {
			        case '+':
			            resp._result = req._x + req._y;
			            break;
			        case '-':
			            resp._result = req._x - req._y;
			            break;
			        case '*':
			            resp._result = req._x * req._y;
			            break;
			        case '%':
			        {
			            if (req._y == 0)
			                resp._code = MOD_ERR;
			            else
			                resp._result = req._x % req._y;
			        }
			        break;
			        case '/':
			        {
			            if (req._y == 0)
			                resp._code = DIV_ERR;
			            else
			                resp._result = req._x / req._y;
			        }
			        break;
			        default:
			            resp._code = OP_ERR;
			            break;
			        }
			        return resp;
			    }
			
			    // "len"\n"10 + 20"\n
			    std::string Handler(std::string &package)
			    {
			        std::string content;
			        bool ret = Decode(package, &content); // content = "10 + 20"
			        if (!ret)
			            return "";
			
			        Request req;
			        ret = req.Deserialize(content); // x = 10, y = 20, op = '+'
			        if (!ret)
			            return "";
			
			        content = "";
			        Response resp = CalculatorHelper(req); // result = 30, code = 0
			
			        resp.Serialize(&content);              // content = "30 0"
			        content = Encode(content);             // content = "len"\n"30 0\n"
			
			        return content;
			    }
			
			    ~Calculator()
			    {}
			};

(4)main.cpp

下面是主函数的调用:

			#include <iostream>
			#include <memory>
			#include <functional>
			
			#include "TcpServer.hpp"    // 处理IO
			#include "Calculator.hpp"   // 处理业务
			#include "log.hpp"
			
			Calculator calculator;
			
			void DefaultOnMessage(std::weak_ptr<Connection> conn)
			{
			    if(conn.expired()) return;
			    auto connection_ptr = conn.lock();
			
			    std::cout << "Application layerget a message: " << connection_ptr->Inbuffer() << std::endl;
			
			    // 对报文进行处理
			    std::string response_str = calculator.Handler(connection_ptr->Inbuffer());
			    if(response_str.empty()){
			        return;
			    }
			    lg(Debug, "%s", response_str.c_str());
			
			    // response_str 发送出去
			    connection_ptr->AppendOutBuffer(response_str);
			
			    // 因为写事件(发送缓冲区是否有空间,经常是ok的),经常是就绪的
			    // 所以如果我们设置对 EPOLLOUT 关心,那么 EPOLLOUT 几乎每次都是就绪的
			    // 就导致 epollserver 经常返回,浪费 CPU 资源
			    // 所以,对于读取,我们设置为常关心;对于写,我们设置为按需设置
			    // 处理写事件:直接写入,如果写入完成,就结束。
			    // 如果写入完成,但是数据还没有写完,_outbuffer 里还有内容,我们就需要设置对写事件进行关心了,如果写完了,就去掉写事件的关心
			    // connection_ptr->_send_cb(connection_ptr);
			    auto tcpserver = connection_ptr->_tcpServer_ptr.lock();
			    tcpserver->Sender(connection_ptr);
			}
			
			int main()
			{ 
			    std::shared_ptr<TcpServer> tcp_svr(new TcpServer(8888, DefaultOnMessage));
			    tcp_svr->Init();
			    tcp_svr->Loop();
			
			    return 0;
			}

其中有一些头文件例如 Socket.hpp 和 log.hpp 我们以前已经用过,这里就不再放出来了。

(5)CMakeLists.txt

			cmake_minimum_required(VERSION 2.8)
			project(ReactorServer)
			
			add_executable(reactorServer main.cc)
			target_link_libraries(reactorServer jsoncpp)
			
			add_executable(clientCal ClientCal.cc)
			target_link_libraries(clientCal jsoncpp)

3. 总结

Reactor 其实是一个半同步半异步模型,那么 IO 等于等待+数据拷贝,所以 Reactor 的半同步半异步体现在,等待是由 epoll 完成,这是体现同步;异步体现在 Reactor 可以进行回调处理。

Reactor 模式中,有一个事件循环(Event Loop)负责监听和分发事件。当有新的事件到达时,事件循环会将其分发给相应的处理程序进行处理。这种方式可以实现高效的并发处理,避免了线程创建和销毁的开销。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1570440.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

中高级前端? 这些一元运算符,你真的搞清楚了吗

前言 一元运算符&#xff0c;不太起眼&#xff0c;作用很大&#xff0c;请别忽视她&#xff01; 走近她&#xff0c;爱上她&#xff01; 定义 只需要一个操作数的运算符称为一元运算符。 还是代码容易懂&#xff1a; 1 // 一个操作数1 2 // 两个操作数一元运算符清单 运…

MacOS - brew 和 brew cask 有什么区别?

brew 是 ruby 的包管理&#xff0c;后来看 yangzhiping 的博客介绍了 brew cask&#xff0c;感觉 cask 是更好的关联关系管理&#xff0c;但是&#xff0c;我后来使用过程中&#xff0c;发现很多软件 brew cask 里没有&#xff0c;但是 brew 里面倒是挺多&#xff01;今天来给说…

一键批量高效记账,支持通过关键词来筛选某个人的借款记录,方便高效管理收支明细

随着生活节奏的加快&#xff0c;个人和企业财务管理变得越来越复杂。尤其是在处理大量记账任务时&#xff0c;如何快速、准确地完成&#xff0c;并且能够方便地追踪和筛选特定借款记录&#xff0c;成为了许多人关注的焦点。现在&#xff0c;我们为您提供一款全新的财务管理工具…

小程序appsecret在哪里看

问题开发中需要用到appid和secret。但找不到secret。后面发现是在微信开发者平台自己生成才行 微信公众平台 (qq.com) 在开发者平台的开发管理当中&#xff0c;生产即可

U盘位置不可用?数据恢复有高招!

在我们日常生活和工作中&#xff0c;U盘已经成为不可或缺的数据存储工具。然而&#xff0c;有时我们会遭遇一个令人头疼的问题&#xff1a;将U盘插入电脑后&#xff0c;系统提示“U盘位置不可用”。这究竟意味着什么呢&#xff1f;简单来说&#xff0c;这就是电脑无法识别或访问…

mysql慢sql排查与分析

当MySQL遇到慢查询&#xff08;慢SQL&#xff09;时&#xff0c;我们可以通过以下步骤进行排查和优化&#xff1a; 标题开启慢查询日志&#xff1a; 确保MySQL的慢查询日志已经开启。通过查看slow_query_log和slow_query_log_file变量来确认。 如果没有开启&#xff0c;可以…

新项目视频号小店挣钱吗?可全职可副业可量化,新蓝海项目来了!

大家好&#xff0c;我是电商花花。 一个有潜力的新项目&#xff0c;自然会吸引到很多创业者的追捧&#xff0c;尤其是&#xff0c;目前最火的视频号小店无货源项目&#xff0c;可以说是继抖音小店无货源电商之外又一匹黑马。 今年想创业的人不妨看一看&#xff0c;视频号小店值…

chatGPT4无法登录

遇到问题&#xff1a;chatgpt网站上点击登录&#xff08;log in),网站就会跳转并显示&#xff1a;unable to connect 解决方法&#xff1a;不要用亚洲节点&#xff0c;亚洲节点被全面封禁&#xff0c;在全局代理中可以换成美国的节点

约数与倍数-第12届蓝桥杯选拔赛Python真题精选

[导读]&#xff1a;超平老师的Scratch蓝桥杯真题解读系列在推出之后&#xff0c;受到了广大老师和家长的好评&#xff0c;非常感谢各位的认可和厚爱。作为回馈&#xff0c;超平老师计划推出《Python蓝桥杯真题解析100讲》&#xff0c;这是解读系列的第45讲。 约数与倍数&#…

基于SpringBoot Vue学生信息管理

一、&#x1f4dd;功能介绍 基于SpringBoot Vue学生信息管理 角色&#xff1a;管理员、学生、教师 管理员&#xff1a;管理员进入主页面&#xff0c;主要功能包括对系统首页、个人中心、学生管理、教师管理、公告通知管理、课程类型管理、课程信息管理、选课信息管理、课程成…

Java 关键字 this 使用详解(通俗易懂)

this关键字主要有以下三个地方使用 在方法体中引用当前对象&#xff0c;即其方法被调用的对象&#xff0c;以便将当前对象的实例变量或当前对象作为参数传递给其他方法。 ① t this.x; 要在方法中引用当前对象&#xff0c;可以使用关键字 this。 ② return this; 作为当前…

Spring注解开发和XML开发

目录 Spring简介发展史Spring Framework系统架构spring 核心概念IOC、IOC容器、Bean、DIIOC快速入门DI快速入门 IOCBean基础配置id与class属性name属性scope属性 Bean的实例化构造方法静态工厂实例工厂FactoryBean的使用&#xff08;工厂实例的简化&#xff09; Bean的生命周期…

160 Linux C++ 通讯架构实战14,epoll 反应堆模型

到这里&#xff0c;我们需要整理一下之前学习的epoll模型&#xff0c;并根据之前的epoll模型&#xff0c;提出弊端&#xff0c;进而整理epoll反应堆模型&#xff0c;进一步深刻理解&#xff0c;这是因为epoll实在是太重要了。 复习之前的epoll的整体流程以及思路。 参考之前写…

DHCP工作过程以及抓包分析

从PC1的e0/0/1接口进行抓包 客户端基于UDP、源端口68、目标端口67进行广播请求&#xff0c;源IP0.0.0.0&#xff0c;&#xff08;无效地址&#xff0c;代表本地无地址&#xff09;目标IP255.255.255.255&#xff1b; 从下面截图可以看出&#xff1a; 源mac为电脑mac&#xff…

采用C#.net6.0+Vue,Ant-Design技术开发的一套大型医院手术麻醉信息系统源码,系统成熟,运行稳定

手术麻醉信息系统源码&#xff0c;C#手麻系统源码&#xff0c;自主版权应用案例&#xff08;适合上项目&#xff09; 手术麻醉信息系统可以实现手术室监护仪、麻醉机、呼吸机、输液泵等设备输出数据的自动采集&#xff0c;采集的数据能据如实准确地反映患者生命体征参数的变化&…

【Leetcode笔记】102.二叉树的层序遍历

目录 知识点Leetcode代码&#xff1a;ACM模式代码&#xff1a; 知识点 vector、queue容器的操作 对vector<int> vec;做插入元素操作&#xff1a;vec.push_back(x)。对queue<TreeNode*> que;做插入元素操作&#xff1a;que.push(root);。队列有四个常用的操作&…

Redis从入门到精通(六)Redis实战(三)优惠券秒杀

↑↑↑下载测试项目原代码↑↑↑ 文章目录 前言4.3 优惠券秒杀4.3.1 数据表与实体类4.3.2 添加优惠券4.3.2.1 添加普通券代码4.3.2.2 添加秒杀券代码 4.3.3 实现秒杀下单4.3.3.1 秒杀下单逻辑分析4.3.3.2 获取秒杀订单ID4.3.3.3 获取用户ID4.3.3.4 实现秒杀下单 前言 Redis实战…

团体程序设计天梯赛-练习集 01

天梯赛题解合集 团体程序设计天梯赛-练习集 (L1-001 - L1-012) 团体程序设计天梯赛-练习集 (L1-013 - L1-024) 团体程序设计天梯赛-练习集 (L1-025 - L1-036) 团体程序设计天梯赛-练习集 (L1-037 - L1-048) L1-001 Hello World 输出题 样例 输入 输出 Hello World!思…

笔记本电脑win7 Wireless-AC 7265连不上wifi6

1.背景介绍 旧路由器连接人数有限&#xff0c;老旧&#xff0c;信号不稳定更换了新路由器&#xff0c;如 TL-XDR5430易展版用户电脑连不上新的WIFI网络了&#xff0c;比较着急 核心问题&#xff1a;有效解决笔记本连接wifi上网问题&#xff0c;方法不限 2.环境信息 Windows…

深入探索MySQL:成本模型解析与查询性能优化,及未来深度学习与AI模型的应用展望

码到三十五 &#xff1a; 个人主页 在数据库管理系统中&#xff0c;查询优化器是一个至关重要的组件&#xff0c;它负责将用户提交的SQL查询转换为高效的执行计划。在MySQL中&#xff0c;查询优化器使用了一个称为“成本模型”的机制来评估不同执行计划的优劣&#xff0c;并选择…