【Linux】Reactor模式

news2024/11/27 14:44:42

​🌠 作者:@阿亮joy.
🎆专栏:《学会Linux》
🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根
在这里插入图片描述

目录

    • 👉Reactor 模式👈
      • 什么是 Reactor 模式
      • Reactor 模式的组件
      • Reactor 模式的工作流程
    • 👉使用 Reactor 模式设计 TcpServer👈
      • socket 的封装
      • 日志模块
      • Epoll 模型的封装
      • TcpServer 的设计
      • 协议定制
      • 服务端的编写
      • 客户端的编写
      • 综合演示
    • 👉总结👈

👉Reactor 模式👈

什么是 Reactor 模式

Reactor 模式是一种事件驱动的设计模式,是将就绪的事件交给特定的事件处理器,用于实现高效的事件驱动程序。它主要用于网络编程中,用于处理大量的并发连接。

Reactor 模式的组件

Reactor 模式的核心思想是将 I / O 操作(如网络请求、文件操作等)抽象为事件,然后通过一个事件循环(Event Loop)来监视和分发这些事件。它的基本组成部分包括:

  • 事件(Event):在网络服务器中,事件通常是指 socket 上的读事件、写事件和异常事件等。事件通常都需要和 socket 与事件处理器绑定在一起。事件可以在 I / O 通道状态发生变化时被触发,然后被事件分发器通知给事件循环。事件循环通过事件的类型和相关信息来确定哪个事件处理器应该处理这个事件。

  • 事件处理器(Handler):每一种 I / O 事件都对应一个处理器,负责处理特定类型的事件。例如,网络连接事件、数据读取事件、数据写入事件等都可以有对应的事件处理器。

  • 事件循环(Event Loop):是 Reactor 模式的核心部分,它不断地检查是否有新的事件发生,如果有就将事件分发给相应的事件处理器进行处理。事件循环通常以无限循环的形式运行,直到系统关闭。

  • 事件分发器(Demultiplexer):用于监视多个 I / O 通道的状态,以确定哪些通道已经就绪(可读或可写)。这个组件可以使用操作系统提供的机制,如 select、poll、epoll 等。高性能的网络服务器使用的都是 epoll 接口。

Reactor 模式的工作流程

Reactor模式的工作流程如下:

  • 应用程序初始化:创建事件处理器和事件循环,将事件处理器注册到事件循环中。

  • 事件循环开始:事件循环开始无限循环,在每次循环中,它会通过事件分发器检查所有的I/O通道,看是否有事件就绪。

  • 事件分发:当某个I/O通道就绪时,事件分发器将通知事件循环,事件循环根据就绪的通道找到对应的事件处理器,并将事件传递给它进行处理。

  • 事件处理:事件处理器根据收到的事件类型执行相应的操作,这可能涉及读取数据、写入数据、连接管理等。

  • 事件处理完成:事件处理器执行完毕后,将结果返回给事件循环。

通过这种方式,Reactor 模式可以实现高并发的 I / O 操作,因为它使用事件循环在单线程中处理多个 I / O 事件,避免了创建多个线程或进程,从而减少了资源开销和上下文切换的成本。

但是 Reactor 模式适用于 I / O 密集型的应用,但不适用于 CPU 密集型的场景,因为在事件处理过程中如果发生阻塞操作,会影响其他事件的处理。为了解决这个问题,可以将阻塞的操作委托给线程池等机制来处理。

👉使用 Reactor 模式设计 TcpServer👈

socket 的封装

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ctype.h>
#include <fcntl.h>

class Sock
{
private:
    // listen的第二个参数是底层全链接队列的长度,其数值为listen的第二个参数+1
    const static int gbackLog = 10;
public:
    Sock()
    {}

    // 常见套接字
    static int Socket()
    {
        int sock = socket(AF_INET, SOCK_STREAM, 0);
        if(listen < 0) exit(2);
        int opt = 1;
        setsockopt(sock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
        return sock;
    }

    // 绑定 IP 地址和端口号
    static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0")
    {
        struct sockaddr_in local;
        memset(&local, 0, sizeof local);
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        inet_pton(AF_INET, ip.c_str(), &local.sin_addr);
        if(bind(sock, (struct sockaddr*)&local, sizeof(local)) < 0) exit(3);
    }

    static void Listen(int sock)
    {
        if(listen(sock, gbackLog) < 0)
            exit(4);
    }

    // 一般经验
    // const std::string &: 输入型参数
    // std::string *: 输出型参数
    // std::string &: 输入输出型参数
    // 获取新连接
    static int Accept(int listenSock, std::string* ip, uint16_t* port, int* acceptErrno)
    {
        struct sockaddr_in src;
        socklen_t len = sizeof(src);
        *acceptErrno = 0;
        int serviceSock = accept(listenSock, (struct sockaddr*)&src, &len);
        if(serviceSock < 0) 
        {
            *acceptErrno = errno;
            return -1;
        }
        if(port) *port = ntohs(src.sin_port);
        if(ip) *ip = inet_ntoa(src.sin_addr);
        return serviceSock;
    }

    // 连接服务器
    static bool Connect(int sock, const std::string& serverIp, const uint16_t& serverPort)
    {
        struct sockaddr_in server;
        memset(&server, 0, sizeof server);
        server.sin_family = AF_INET;
        server.sin_port = htons(serverPort);
        server.sin_addr.s_addr = inet_addr(serverIp.c_str());

        if(connect(sock, (struct sockaddr*)&server, sizeof server) == 0) return true;
        else return false;
    }

    // 将 sock 设置为非阻塞,与 ET 模式配合使用
    static bool SetNonBlock(int sock)
    {
        int fl = fcntl(sock, F_GETFL);
        if(fl < 0)  return false;
        fcntl(sock, F_SETFL, fl | O_NONBLOCK);
        return true;
    }

    ~Sock()
    {}
};

Sock 类主要是封装了 socket 的相关接口,主要是创建套接字、绑定 IP 地址和端口号、将套接字设置为监听套接字、连接服务器、服务器获取新连接以及将套接字设置为非阻塞等等。

日志模块

#pragma once

#include <cstdio>
#include <cstdarg>
#include <string>
#include <iostream>
#include <poll.h>
#include <ctime>

// 日志等级
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4

#define LOGFILE "./ThreadPool.log"

const char* levelMap[] = 
{
    "DEBUG",
    "NORMAL",
    "WARNING",
    "ERROR",
    "FATAL"
};

void logMessage(int level, const char* format, ...)
{
    char stdBuffer[1024];   // 标准部分
    time_t timestamp = time(nullptr);
    // struct tm *localtime = localtime(&timestamp);
    snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", levelMap[level], timestamp);

    char logBuffer[1024];   // 自定义部分
    va_list args;   // va_list就是char*的别名
    va_start(args, format); // va_start是宏函数,让args指向参数列表的第一个位置
    // vprintf(format, args); // 以format形式向显示器上打印参数列表
    vsnprintf(logBuffer, sizeof logBuffer, format, args);

    va_end(args);   // va_end将args弄成nullptr

    // FILE* fp = fopen(LOGFILE, "a");
    printf("%s%s\n", stdBuffer, logBuffer);
    // fprintf(fp, "%s%s\n", stdBuffer, logBuffer);    // 向文件中写入日志信息
    // fclose(fp);
}

日志模块在之前的博客中已经多次提及到,就不再赘述了。

Epoll 模型的封装

#pragma once

#include <iostream>
#include <sys/epoll.h>

class Epoll
{
private:
    const static int defaultSize = 128;
    const static int defaultTimeOut = 3000;

public:
    Epoll(int timeout = defaultTimeOut)
        : _timeout(timeout)
    {}

    ~Epoll()
    {
        if(_epfd >= 0) close(_epfd);
    }

    // 创建 epoll 模型
    void CreateEpoll()
    {
        _epfd = epoll_create(defaultSize);
        if(_epfd < 0) exit(5);
    }

    // 将 socket 添加到 epoll 模型中
    bool AddSockToEpoll(int sock, uint32_t events)
    {
        events |= EPOLLET; // ET 模式
        struct epoll_event ev;
        ev.data.fd = sock;
        ev.events = events;
        int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev);
        return n == 0;
    }

    bool DelSockFromEpoll(int sock)
    {
        int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
        return n == 0;
    }

    int WaitEpoll(struct epoll_event* readyEvents, int maxEvents)
    {
        return epoll_wait(_epfd, readyEvents, maxEvents, _timeout);
    }

    bool CtlEpoll(int sock, uint32_t events)
    {
        events |= EPOLLET;
        struct epoll_event ev;
        ev.data.fd = sock;
        ev.events = events;
        int n = epoll_ctl(_epfd, EPOLL_CTL_MOD, sock, &ev);
        return n == 0;
    }

private:
    int _epfd;
    int _timeout;
};

Epoll 类主要是封装了 epoll 的相关接口,方便 TcpServer 服务器进行调用。

  • CreateEpoll:TcpServer 调用该接口,即可创建一个 epoll 模型。
  • AddSockToEpoll:将 TcpServer 需要关心该 sock 的 events 事件添加到 epoll 模型中,让 epoll 来关心这些事件,事件发生后通知 TcpServer 进行处理。
  • DelSockFromEpoll:让 epoll 模型不再关心 sock 上发生的事件。
  • WaitEpoll:让 epoll 模型等待事件的发生,事件发生后通知 TcpServer 进行处理。
  • CtlEpoll:修改 epoll 模型对 sock 上发生的事件的关心。比如,之前只让 epoll 模型关心 sock 的读事件,现在让 epoll 模型也关心 sock 的写事件。

TcpServer 的设计

TcpServer 需要处理的事件

  • 读事件:如果监听套接字的读事件就绪,则调用封装好 Accepter 函数获取底层建立好的连接,并进行相应的处理。而如果是普通套接字的读事件就绪,则调用封装好的 Recver 函数读取客户单发来的数据,并进行业务处理。
  • 写事件:当写事件就绪时,就调用封装好的 Sender 函数写入到底层 TCP 的发送缓冲区中。
  • 异常事件:当某个文件描述符的异常事件发生了,我们直接将该文件描述符关闭掉,不进行过多的处理。

Connection 类的封装

#pragma once

#include <iostream>
#include <functional>
#include <vector>
#include <string>
#include <unordered_map>
#include <cassert>
#include <cerrno>
#include "Log.hpp"
#include "Sock.hpp"
#include "Epoll.hpp"
#include "Protocol.hpp"

// 前置声明
class TcpServer; 
class Connection;

using func_t = std::function<void(Connection*)>;
using callback_t = std::function<void(Connection*, std::string&)>;

// 使用 Connection 来表示一个连接
// 该连接有自己的接收缓冲区和发送缓冲区
class Connection
{
public:
    Connection(int sock = -1)
        : _sock(sock)
        , _ptr(nullptr)
    {}

    // 设置该连接的回调函数
    void SetCallBack(func_t readCallBack, func_t writeCallBack, func_t exceptCallBack)
    {
        _readCallBack = readCallBack;
        _writeCallBack = writeCallBack;
        _exceptCallBack = exceptCallBack;
    }

    ~Connection()
    {
        if(_sock >= 0) close(_sock);
    }

public:
    // 负责进行 IO 的文件描述符
    int _sock; 
    // 三个回调方法,表示对 _sock 的读事件、写事件和异常事件的处理方法
    func_t _readCallBack;
    func_t _writeCallBack;
    func_t _exceptCallBack;
    // 每个连接独占的接收缓冲区和发送缓冲区
    std::string _inBuffer; // 暂时没有办法处理二进制流,文本数据是可以处理的
    std::string _outBuffer;
    // TcpServer 的回指指针,用于业务处理
    TcpServer* _ptr;
};

Connection 类中除了包含文件描述符和其对应的读时间回调、写事件回调和异常事件回调之外,还包含一个输入缓冲区 _inBuffer、一个输出缓冲区 _outBuffer 以及一个回指指针 _ptr。

  • 当某个文件文件描述符的读事件就绪时,我们会调用 recv 函数读取客户端发过来的数据,但是这并不能保证我们读取到了一个完整的报文,因此我们需要将读取到的数据暂时存放在该文件描述符对应的接收缓冲区 _inBuffer 中。当 _inBuffer 中的数据能够分离出一个完整的报文,我们再进行业务处理,所以 _inBuffer 的本质就是用来解决粘包问题的。
  • 当处理完一个网络请求后,需要将响应的数据发送给客户端,但是我们并不能保证底层 TCP 的发送缓冲区中有足够的空间供我们写入,因此需要将要发送的响应数据暂时保存在该文件描述符对应的发送缓冲区 _outBuffer 中。等待底层 TCP 发送缓冲区中有空间时,再将 _outBuffer 中的数据发送出去。
  • Connection 类中还包含了指向 TcpServer 的回指指针,便于我们能够快速找到 TcpServer,这种操作在开源项目中非常常见,有利于我们进行软件分层。业务处理完,需要将响应发给客户端,此时我们就可以开启对文件描述符写事件的关心,然后就能够触发一次写事件就绪将响应发送出去。

TcpServer 的构造和析构

class TcpServer
{
private:
    const static int defaultPort = 8080;
    const static int defaultMaxEvents = 128;

public:
    TcpServer(int port = defaultPort, int maxEvents = defaultMaxEvents)
        : _port(port), _maxEvents(maxEvents)
    {
        // 1. 创建监听套接字
        _listenSock = Sock::Socket();
        Sock::Bind(_listenSock, _port);
        Sock::Listen(_listenSock);
        // 2. 创建 epoll 模型
        _epoll.CreateEpoll();
        // 3. 将 _listenSock 封装成 Connection 添加到 TcpServer 中
        // _listenSock 只需要关心读事件即可
        AddConnection(_listenSock, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
        // 4. 申请保存就绪事件的数组
        _readyEvents = new struct epoll_event[_maxEvents];
        logMessage(NORMAL, "Init TcpServer Success");
    }

    ~TcpServer()
    {
        if (_listenSock >= 0)
            close(_listenSock);
        if (_readyEvents)
            delete[] _readyEvents;
    }
    
private:
    int _listenSock; // 监听套接字
    int _port; // 端口号
    Epoll _epoll;
    std::unordered_map<int, Connection *> _connections; // 用来管理连接的哈希表
    struct epoll_event *_readyEvents; // 保存就绪事件的数组,可以用 vector 代替
    int _maxEvents; // _readyEvents 的最大容量
    // 业务处理的回调函数
    callback_t _callBack;
};

在 TcpServer 类中,除了必要的服务器的端口号、监听套接字之外,还有封装好的 _epoll、用来管理连接的哈希表以及业务处理的回调函数等等。

  • 在构造函数中,主要做的事件就是创建监听套接字、创建 epoll 模型、调用 AddConnetion 函数设置监听套接字的读事件的回调函数并将监听套接字封装成 Connection 保存到哈希表 _connections 中进行管理、申请空间用来保存就绪的事件。
  • 在析构函数中,需要调用 close 函数来关闭监听套接字,还需要释放用来保存就绪事件的空间。
  • 设置业务处理的回调函数的目的是让网络层的代码和业务层的代码解耦,避免强耦合。这样做的好处就是修改业务层的代码,并不会影响到网络层的代码。

AddConnetion 函数的介绍

class TcpServer
{
	// ...
private:
    // 将连接添加到 TcpServer 中
    void AddConnection(int sock, func_t readCallBack, func_t writeCallBack, func_t exceptCallBack)
    {
        /* ************************************************************
         * TcpServer 上将会存在大量的连接,也就是会存在大量的 Connection
         * 那么 TcpServer 就需要采用先描述再组织的方式来将大量的 Connection
         * 管理起来,在这里我是要哈希表的方式来管理
         * ************************************************************
         */

        // 1. 将文件描述符设置为非阻塞(ET模式)
        Sock::SetNonBlock(sock);
        // 2. 构建 Connection 对象
        Connection *conn = new Connection(sock);
        conn->SetCallBack(readCallBack, writeCallBack, exceptCallBack);
        conn->_ptr = this;
        // 3. 将 sock 添加到 epoll 中
        // 任何的多路转接服务器一般默认只会打开对
        // 读事件的关心,对写事件的关心按需打开
        _epoll.AddSockToEpoll(sock, EPOLLIN | EPOLLET);
        // 4. 保存文件描述符与 Connection 的映射关系
        _connections[sock] = conn;
    }
    // ...
};
  • 需要给 AddConnetion 函数传入四个参数,分别是套接字 sock,读事件回调 readCallBack、写事件回调 writeCallBack 以及异常事件回调 exceptCallBack。
  • 由于 TcpServer 采用的是 ET 模式的 epoll 模型,所以需要将套接字 sock 设置为非阻塞。
  • 申请 Connection 对象保存套接字 sock,设置 Connection 的相关回调,然后设置指向 TcpServer 的回指指针。
  • 将 sock 添加到 epoll 模型中,让 epoll 来帮我们关心 sock 上发送的事件。需要注意的是,我们只需要让 epoll 关心 sock 的读事件即可,写事件按照需要来进行开启。
  • 最后,需要将 sock 和 Connection 的映射关系保存到哈希表中。

事件分发 Dispather

class TcpServer
{
	// ...
public:
	// 根据就绪的事件进行特定事件的派发
    void Dispacther(callback_t callBack)
    {
        _callBack = callBack;
        while (true)
        {
            LoopOnce();
        }
    }
    
private:
    void LoopOnce()
    {
        int n = _epoll.WaitEpoll(_readyEvents, _maxEvents);
        for (int i = 0; i < n; ++i)
        {
            int sock = _readyEvents[i].data.fd;        // 就绪的文件描述符
            uint32_t revents = _readyEvents[i].events; // 就绪的事件

            // 将所有的异常都交给Recver或Sender来处理
            if (revents & EPOLLERR) // 文件描述符发生了错误
                revents |= (EPOLLIN | EPOLLOUT);
            if (revents & EPOLLHUP) // 对端关闭了连接
                revents |= (EPOLLIN | EPOLLOUT);

            // 如果连接存在且设置了相应的回调方法,才可以调用回调方法
            if (revents & EPOLLIN)
            {
                if (ConnectionIsExists(sock) && _connections[sock]->_readCallBack != nullptr)
                    _connections[sock]->_readCallBack(_connections[sock]);
            }
            if (revents & EPOLLOUT)
            {
                if (ConnectionIsExists(sock) && _connections[sock]->_writeCallBack != nullptr)
                    _connections[sock]->_writeCallBack(_connections[sock]);
            }
        }
    }

    // 检查连接是否存在
    bool ConnectionIsExists(int sock)
    {
        return _connections.count(sock) == 1;
    }
    // ...
};
  • Dispacther 函数需要调用者传入一个回调函数,这个回调函数是用于处理相关业务的,只要传入的回调函数不同,就能够处理不同的业务了。保存好用于处理业务的回调函数后,就需要开启事件循环了。事件循环通常以死循环的形式运行,也就是一直调用 LoopOnce 函数。
  • 在 LoopOnce 函数中,主要是调用 WaitEpoll 函数等待事件的发生。当有事件发生后,如果 sock 在哈希表中并设置了相应的回调函数,则调用相应的回调函数,也就是事件处理。

Accepter 函数的介绍

class TcpServer
{
	// ...   
private:
    // _listenSock 用来接收新连接的回调方法
    void Accepter(Connection *conn)
    {
        // 来到这里说明有新的连接到来,因为不知道底层有多少个连
        // 接,所以需要通过while循环一直获取连接。如果不是这样
        // 的话,可能会导致有些客户端长时间连接不上服务器的问题
        while (true)
        {
            std::string clientIp;
            uint16_t clientPort;
            int acceptErrno = 0;
            int sock = Sock::Accept(_listenSock, &clientIp, &clientPort, &acceptErrno);
            if (sock < 0)
            {
                if (acceptErrno == EAGAIN || acceptErrno == EWOULDBLOCK)
                    break; // 底层的连接已经全部获取完了,可以退出循环了
                else if (acceptErrno == EINTR)
                    continue; // 被信号中断,可以继续获取连接
                else
                {
                    // accept失败
                    logMessage(WARNING, "Accepter Error:%d %s", acceptErrno, strerror(acceptErrno));
                    break;
                }
            }
            // 将 sock 添加到 TcpServer 中
            if (sock >= 0)
            {
                AddConnection(sock, std::bind(&TcpServer::Recver, this, std::placeholders::_1),
                             std::bind(&TcpServer::Sender, this, std::placeholders::_1),
                             std::bind(&TcpServer::Excepter, this, std::placeholders::_1));
                logMessage(DEBUG, "Accept Client[%s:%d:%d] Success", clientIp.c_str(), clientPort, sock);
            }
        }
    }
    // ...
};
  • 在 Accepter 函数中,主要就是将底层建立好的连接全部获取上来,然后调用 AddConnection 设置 sock 的事件处理器并将连接保存在哈希表中管理起来。
  • 当 Accept 函数的返回值为 -1 时,需要检查错误码。当错误码为 EAGAIN 或 EWOULDBLOCK 时,则表示底层的连接全部获取完了,可以退出循环了。当错误码为 EINTR 时,则表明获取连接时收到了信号,继续循环即可。当错误码为其他时,则表明真的遇到了错误,那么就打印错误信息并退出循环。

Recver 函数的介绍

class TcpServer
{
	// ...   
private:
    // 普通套接字读事件就绪的回调方法
    void Recver(Connection *conn)
    {
        const int num = 1024;
        bool err = false; // 表示对端是否关闭连接
        while (true)
        {
            char buffer[num];
            ssize_t n = recv(conn->_sock, buffer, sizeof(buffer) - 1, 0);
            if (n < 0)
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                    break;
                else if (errno == EINTR)
                    continue;
                else
                {
                    logMessage(ERROR, "Recver Error:%d %s", errno, strerror(errno));
                    conn->_exceptCallBack(conn);
                    err = true;
                    break;
                }
            }
            else if (n == 0)
            {
                logMessage(DEBUG, "Client[%d] Quit, Me Too", conn->_sock);
                conn->_exceptCallBack(conn);
                err = true;
                break;
            }
            else
            {
                // 读取底层数据成功
                buffer[n] = 0;
                conn->_inBuffer += buffer; // 将读取到的事件添加到接受缓存中
            }
        }

        if (!err)
            logMessage(DEBUG, "Client[%d] Send Message:%s", conn->_sock, conn->_inBuffer.c_str());
        if (!err)
        {
            std::vector<std::string> messages;
            // 对 _inBuffer 进行切分,并将切分结果放入到  messages 中
            SpliteMessage(conn->_inBuffer, messages);
            // 将分割出来的独立报文交给具体业务进行处理
            // 在这里可以将message封装成为task,然后push
            // 到任务队列,任务处理交给后端线程池
            for (auto &msg : messages)
                _callBack(conn, msg); // msg 就是一个完整的报文
        }
    }
    // ...
};
  • Recver 函数是普通套接字读事件就绪的回调方法。
  • 在 Recver 函数中,主要是调用 recv 函数将底层收到的数据获取上来,然后根据 recv 函数的返回值 n 来进行具体的操作。
  • 当 n 大于 0 时,则说明获取底层的数据成功。但这并不意味着底层的数据全部获取完了,所以还需要进行读取,不能退出循环。当 n 等于 0 时,则说明对端关闭连接,需要调用异常事件处理函数 Excepter 进行相应的异常处理,然后直接 return 返回。当 n 小于 0 时,则需要关心错误码 errno 了。当 errno 为 EAGAIN 或 EWOULDBLOCK 时,则表示底层的数据全部获取完了,可以退出循环了。当 errno 为 EINTR 时,则表明获取数据时收到了信号,继续循环即可。当 errno 为其他时,则表明真的遇到了错误,那么就打印错误信息,调用 Excepter 函数,然后直接 return 返回。
  • 当退出 while 循环时,则需要对获取到的数据进行解析,然后再进行相关的业务处理。
    -注:对端关闭连接,也会让文件描述符的读事件就绪。

Sender 函数的介绍

class TcpServer
{
	// ...   
public:
    void EnableReadWrite(Connection *conn, bool readAble, bool writeAble)
    {
        uint32_t events = ((readAble ? EPOLLIN : 0) | (writeAble ? EPOLLOUT : 0));
        bool ret = _epoll.CtlEpoll(conn->_sock, events);
        assert(ret);
    }
    
private:
    void Sender(Connection *conn)
    {
        while (true)
        {
            ssize_t n = send(conn->_sock, conn->_outBuffer.c_str(), conn->_outBuffer.size(), 0);
            if (n > 0)
            {
                conn->_outBuffer.erase(0, n);
                if (conn->_outBuffer.empty())
                    break;
            }
            else
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK) // 底层TCP的发送缓冲区没有空间了
                    break;
                else if (errno == EINTR)
                    continue;
                else
                {
                    logMessage(ERROR, "Sender Error:%d %s", errno, strerror(errno));
                    conn->_exceptCallBack(conn);
                    return;
                }
            }
        }

        // 如果数据发送完了,则需要取消对conn->_sock写事件的关心
        // 如果数据没有发送完,则不需要取消对conn->_sock写事件的关心
        if (conn->_outBuffer.empty())
            EnableReadWrite(conn, true, false);
        else
            EnableReadWrite(conn, true, true);
    }
    // ...
};
  • Sender 函数是将响应发回给客户端,但是发送可能会出现各种情况,因此我们要对 send 的返回值 n 进行判断。
  • 当 n 大于 0 时,则说明数据已经成功拷贝到底层 TCP 的发送缓冲区中了。当 n 小于等于 0 时,则需要判断错误码。当错误码为 EAGAIN 或 EWOULDBLOCK 时,则说明底层 TCP 的发送缓冲区已经满了,退出循环,下次再调用 send。当错误码为 EINTR 时,则说明收到了信号,继续循环调用 send。当错误等于其他时,则说明遇到了错误,调用 Excepter 函数,然后直接 return 返回。
  • 退出 while 循环后,如果 Connection 发送缓冲区中的数据都拷贝到底层 TCP 的发送缓冲区了,则关闭对该文件描述符写事件的关心。否则,保持对该文件描述符写事件的关心。

Excepter 函数的介绍

class TcpServer
{
	// ...   
private:
    void Excepter(Connection *conn)
    {
        if (!ConnectionIsExists(conn->_sock))
            return;
        // 1. 从epoll中移除conn->_sock
        int n = _epoll.DelSockFromEpoll(conn->_sock);
        assert(n);
        (void)n;
        // 2. 从_connections中移除conn->_sock
        _connections.erase(conn->_sock);
        // 3. 释放conn所占用的资源(Connect的析构函数会关闭文件描述符)
        delete conn;
    }
    // ...
};
  • Excepter 函数的主要工作就是移除 epoll 模型对文件描述符的关心、从哈希表中删除文件描述符以及释放 Connection。

协议定制

#pragma once

#include <iostream>
#include <vector>
#include <string>
#include <cstring>

// 使用特殊字符作为分隔符来解决粘包问题,处理独立报文
#define SEP "X"
#define SEP_LEN strlen(SEP)
// 用于序列化和反序列化
#define SPACE " "
#define SPACE_LEN strlen(SPACE)

// buffer: 输入输出型参数
// out: 输出型参数
// buffer中可能会存在多个报文,没有报文是以SEP作为分隔符的
// 我们想要将不同的报文切分好,放入到out中
void SpliteMessage(std::string& buffer, std::vector<std::string>& out)
{
    while(true)
    {
        auto pos = buffer.find(SEP);
        if(pos == std::string::npos) break;
        std::string message = buffer.substr(0, pos);
        buffer.erase(0, pos + SEP_LEN);
        out.emplace_back(message);
    }    
}

// 给报文加上分隔符
std::string Encode(std::string& s)
{
    return s + SEP;
}

// 去除报文中的分割符
std::string Decode(std::string& s)
{
    auto pos = s.find(SEP);
    // s中没有一个完整的报文时,返回空串
    // s中有一个完整的报文时,返回该报文
    if(pos == std::string::npos) return "";
    std::string ret = s.substr(0, pos);
    s.erase(0, pos + SEP_LEN);
    return ret;
}

// 网络版计算器的 Request 和 Response
class Request
{
public:
    // 序列化
    std::string Serialize()
    {
        std::string str;
        str = std::to_string(_x);
        str += SPACE;
        str += _op;
        str += SPACE;
        str += std::to_string(_y);
        return str;
    }

    // 反序列化
    bool Deserialized(const std::string &str) 
    {
        std::size_t left = str.find(SPACE);
        if (left == std::string::npos)
            return false;
        std::size_t right = str.rfind(SPACE);
        if (right == std::string::npos)
            return false;
        _x = atoi(str.substr(0, left).c_str());
        _y = atoi(str.substr(right + SPACE_LEN).c_str());
        if (left + SPACE_LEN > str.size())
            return false;
        else
            _op = str[left + SPACE_LEN];
        return true;
    }

public:
    Request()
    {}

    Request(int x, int y, char op) 
        : _x(x)
        , _y(y)
        , _op(op)
    {}

    ~Request() {}

public:
    int _x;   
    int _y;   
    char _op; 
};

class Response
{
public:
    // 序列化
    std::string Serialize()
    {

        std::string s;
        s = std::to_string(_code);
        s += SPACE;
        s += std::to_string(_ret);

        return s;
    }

    // 反序列化
    bool Deserialized(const std::string &s)
    {
        std::size_t pos = s.find(SPACE);
        if (pos == std::string::npos)
            return false;
        _code = atoi(s.substr(0, pos).c_str());
        _ret = atoi(s.substr(pos + SPACE_LEN).c_str());
        return true;
    }

public:
    Response()
    {}

    Response(int ret, int code) 
        : _ret(ret)
        , _code(code)
    {}

    ~Response() {}

public:
    int _ret; // 计算结果
    int _code;   // 计算结果的状态码
};

我们的业务还是一个网络版的计算器,我们需要定制协议来解决粘包问题。

  • SpliteMessage 函数的作用就是以 SEP 为分割符,将 Connection 的接受缓冲区中的数据分割成一个个的报文,然后交给业务层进行处理。
  • Encode 函数的作用是给报文加上分割符,以明确报文和报文之间的边界,以解决粘包问题。Decode 函数的作用是去除报文中的分割符并返回一个完整的报文,如果没有一个完整的报文,则返回空串。

服务端的编写

#include "TcpServer.hpp"
#include <memory>
#include <unordered_map>
#include <fstream>

static Response Calculator(const Request &req)
{
    Response resp(0, 0);
    switch (req._op)
    {
    case '+':
        resp._ret = req._x + req._y;
        break;
    case '-':
        resp._ret = req._x - req._y;
        break;
    case '*':
        resp._ret = req._x * req._y;
        break;
    case '/':
        if (req._y == 0)
            resp._code = 1; // 除零错误
        else
            resp._ret = req._x / req._y;
        break;
    case '%':
        if (req._y == 0)
            resp._code = 2; // 模零错误
        else
            resp._ret = req._x % req._y;
        break;
    default:
        resp._code = 3;
        break;
    }
    return resp;
}

void NetCal(Connection* conn, std::string& request)
{
    logMessage(DEBUG, "NetCal Been Called, Get Request: %s", request.c_str());
    // 1. 反序列化
    Request req;
    if(!req.Deserialized(request)) 
    {
        // 按道理来说,来到这里收到的一定是一个完整的报文
        // 但现在却反序列化失败,输出一下错误日志
        logMessage(ERROR, "Request Deserialized Error");
        return; 
    }
    // 2. 业务处理
    Response resp = Calculator(req);
    // 3. 序列化,构建应答
    std::string sendStr = resp.Serialize();
    sendStr = Encode(sendStr);
    // 4. 业务层不需要关心数据如何发送给客户端,只需要将序
    // 列化后的应答交给TcpServer,让它将应答发送给客户端
    conn->_outBuffer += sendStr;
    // 一旦开启对EPOLLOUT的关心,epoll会立即触发一次发送事件就绪
    // 如果后续保持对EPOLL的关心,TcpServer会一直进行发送
    conn->_ptr->EnableReadWrite(conn, true, true);
}

int main()
{
    std::unique_ptr<TcpServer> svr(new TcpServer());
    svr->Dispather(NetCal);
    
    return 0;
}

创建一个 TcpServer,然后调用它的 Dispatcher 函数。调用 Dispatcher 函数时,需要传入业务处理的回调函数。传入的回调函数不同,就可以处理不同的业务了。

客户端的编写

#include <iostream>
#include <string>
#include <ctime>
#include "Sock.hpp"
#include "Protocol.hpp"

// 客户端调用
void Send(int sock, const std::string& sendStr)
{
    int n = send(sock, sendStr.c_str(), sendStr.size(), 0);
    if(n != sendStr.size())
       std::cerr << "Send Error" << std::endl; 
}

// 客户端调用
bool Recv(int sock, std::string& out)
{
    char buffer[1024];
    while(true)
    {
        ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0);
        if(n > 0)
        {
            buffer[n] = 0;
            out += buffer;
        }
        else if(n == 0)
        {
            return false;
        }
        else
        {
            if(errno == EAGAIN || errno == EWOULDBLOCK)
            {
                if(out.empty()) continue; // 如果没有收到响应,则继续对应 recv 函数
                return true;
            }
            else if(errno == EINTR)
            {
                continue;
            }
            else
            {
                std::cerr << "Recv Error" << std::endl;
                return false;
            }
        }
    }
}

static void Usage(char* proc)
{
    std::cout << "\nUsage: " << proc << "serverIp serverPort" << std::endl;
}

// 网络版计算器客户端
int main(int argc, char* argv[])
{
    srand((unsigned)time(nullptr));
    if(argc != 3)
    {
        Usage(argv[0]);
        exit(1);
    }

    int sock = Sock::Socket();
    if(!Sock::Connect(sock, std::string(argv[1]), atoi(argv[2])))
    {
        std::cerr << "Connect Error: " << errno << " " << strerror(errno) << std::endl; 
    }

    const char* op = "+-*/%#";
    Sock::SetNonBlock(sock);
    std::string buffer;
    while(true)
    {
        Request req;
        // req._x = rand() % 100;
        // req._y = rand() % 100;
        // req._op = op[rand() % 6];
        std::cout << "Please Enter x: ";
        std::cin >> req._x;
        std::cout << "Please Enter op: ";
        std::cin >> req._op;
        std::cout << "Please Enter y: ";
        std::cin >> req._y;
        // 序列化
        std::string sendStr = req.Serialize();
        sendStr = Encode(sendStr);
        // 发送请求
        Send(sock, sendStr);
        // 非阻塞读取全部数据
        bool ret = Recv(sock, buffer);
        if(!ret) break; // 服务器关闭连接或者Recv出错了

        std::string package = Decode(buffer);
        if(package.empty()) continue;
        // 接收到了一个完整的报文
        Response resp;
        resp.Deserialized(package);
        std::string err;
        switch(resp._code)
        {
            case 1:
                err = "Division By Zero Error";
                break;
            case 2:
                err = "Modular Division By Zero Error";
                break;
            case 3:
                err = "Operation Error";
                break;
            default:
                std::cout << "Calculation Result: " << resp._ret << std::endl;
                break;
        }
        if(!err.empty()) std::cout << err << std::endl;
    }
    close(sock);

    return 0;
}

综合演示

在这里插入图片描述

将业务替换成在线词典

服务端

#define FILENAME "OnlineDict.txt"

// 加载单词
void LoadWord(std::unordered_map<std::string, std::string>& m)
{
    std::ifstream ifs(FILENAME);
    std::string key, value;
    while(ifs >> key >> value)
    {
        m[key] = value;
    }
}

void OnlineDict(Connection* conn, std::string& request)
{
    static std::unordered_map<std::string, std::string> m;
    // 加载单词且只加载一次
    if(m.empty()) LoadWord(m);
    // 观察单词是否加载成功
    // for(auto& kv : m)
    // {
    //     std::cout << kv.first << ":" << kv.second << std::endl;
    // }
    logMessage(DEBUG, "NetCal Been Called, Get Request: %s", request.c_str());
    // request就是要查找的单词
    auto it = m.find(request);
    std::string sendStr;
    if(it == m.end())
        sendStr = request +  " Is Not Found";
    else
        sendStr = request + " Means " + m[request];
    
    sendStr = Encode(sendStr); // 添加分隔符

    conn->_outBuffer += sendStr;
    // 一旦开启对EPOLLOUT的关心,epoll会立即触发一次发送事件就绪
    // 如果后续保持堆EPOLL的关心,TcpServer会一直进行发送
    conn->_ptr->EnableReadWrite(conn, true, true);
}

int main()
{
    std::unique_ptr<TcpServer> svr(new TcpServer());
    svr->Dispacther(OnlineDict);
    
    return 0;
}

客户端

int main(int argc, char* argv[])
{
    srand((unsigned)time(nullptr));
    if(argc != 3)
    {
        Usage(argv[0]);
        exit(1);
    }

    int sock = Sock::Socket();
    if(!Sock::Connect(sock, std::string(argv[1]), atoi(argv[2])))
    {
        std::cerr << "Connect Error: " << errno << " " << strerror(errno) << std::endl; 
    }
    Sock::SetNonBlock(sock);

    std::string word;
    std::string buffer;
    while(true)
    {
        std::cout << "Please Enter The Word: ";
        std::cin >> word;
        word = Encode(word);

        Send(sock, word);
        // 非阻塞读取全部数据
        bool ret = Recv(sock, buffer);
        if(!ret) break; // 服务器关闭连接或者Recv出错了
        std::string package = Decode(buffer);
        std::cout << package << std::endl;
    }

    return 0;
}

在这里插入图片描述

在这里插入图片描述

👉总结👈

本篇博客主要讲解了什么是 Reactor 模式、Reactor 模式的组件、Reactor 模式的工作流程以及基于 Reactor 模式的 TCP 服务器等等。以上就是本篇博客的全部内容了,如果大家觉得有收获的话,可以点个三连支持一下!谢谢大家啦!💖💝❣️

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/867132.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

生成式人工智能模型:提升营销分析用户体验

使用生成式人工智能来改善分析体验&#xff0c;使业务用户能够询问有关我们数据平台中可用数据的任何信息。 在本文中&#xff0c;我们将解释如何使用新的生成式人工智能模型 ( LLM ) 来改善业务用户在我们的分析平台上的体验。假设我们为零售销售经理提供 Web 应用程序或移动应…

【雕爷学编程】Arduino动手做(24)---水位传感器模块3

37款传感器与模块的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&#x…

斯坦福「小镇」开源AI智能体;小米应用商店将要求AI应用符合资质标准

&#x1f989; AI新闻 &#x1f680; 斯坦福「小镇」开源AI智能体 摘要:斯坦福研究人员开源了一个类似《西部世界》的数字化「小镇」,里面有25个AI智能体可以生活、工作、社交。这项研究被视为AGI的重要开端,可能会改变游戏、企业应用领域。网友期待这项技术改善游戏NPC的交互…

【Fegin技术专题】「原生态」打开Fegin之RPC技术的开端,你会使用原生态的Fegin吗?(上)

前提介绍 Feign是SpringCloud中服务消费端的调用框架&#xff0c;通常与ribbon&#xff0c;hystrix等组合使用。由于遗留原因&#xff0c;某些项目中&#xff0c;整个系统并不是SpringCloud项目&#xff0c;甚至不是Spring项目&#xff0c;而使用者关注的重点仅仅是简化http调…

软工导论知识框架(七)面向对象设计

一.设计准则 分析&#xff1a;提取、整理用户需求&#xff0c;建立问题域精确模型。设计&#xff1a;转变需求为系统实现方案&#xff0c;建立求解域模型。 在实际的软件开发过程中分析和设计的界限是模糊的&#xff0c;分析和设计活动是一个多次反复迭代的过程。分析的结果可…

无涯教程-Perl - msgsnd函数

描述 此功能使用可选的FLAGS将消息MSG发送到消息队列ID。 语法 以下是此函数的简单语法- msgsnd ID, MSG, FLAGS返回值 该函数在错误时返回0,在成功时返回1。 Perl 中的 msgsnd函数 - 无涯教程网无涯教程网提供描述此功能使用可选的FLAGS将消息MSG发送到消息队列ID。 语法…

接地电阻测试仪的原理和使用事项

接地电阻测试仪&#xff08;Ground Resistance Tester&#xff09;是用来测量接地电阻的一种仪器。接地系统是指用于保护人员和设备的设施&#xff0c;它将电流引导到地下&#xff0c;将任何潜在危险的电流导向地面。 接地电阻测试仪的作用是通过测量接地系统的电阻值来评估其…

C++ STL string类模拟实现

目录 string类成员变量 一.构造函数 二.析构函数 三.拷贝构造 四.size(),capacity() 五.operator [ ] 六. operator 七.字符串比较 八.reserve&#xff08;&#xff09; 九.push_back&#xff08;&#xff09;&#xff0c;append&#xff08;&#xff09; 十.operato…

【雕爷学编程】Arduino动手做(12)---霍尔磁场传感器模块5

37款传感器与模块的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&#x…

数据归一化:优化数据处理的必备技巧

文章目录 &#x1f340;引言&#x1f340;数据归一化的概念&#x1f340;数据归一化的应用&#x1f340;数据归一化的注意事项与实践建议&#x1f340;代码演示&#x1f340;在sklearn中使用归一化&#x1f340;结语 &#x1f340;引言 在当今数据驱动的时代&#xff0c;数据的…

Vue在页面输出JSON对象,测试接口可复制使用

效果图&#xff1a; 数据处理前&#xff1a; 数据处理后&#xff1a; 代码实现&#xff1a; HTML: <el-table height"600" :data"tableData" border style"width: 100%" tooltip-effect"dark" size"mini"><el-…

Django笔记之数据库函数之日期函数

日期函数主要介绍两个大类&#xff0c;Extract() 和 Trunc() Extract() 函数作用是提取日期&#xff0c;比如我们可以提取一个日期字段的年份&#xff0c;月份&#xff0c;日等数据 Trunc() 的作用则是截取&#xff0c;比如 2022-06-18 12:12:12&#xff0c;我们可以根据需求…

深度学习基础知识笔记

深度学习要解决的问题 1 深度学习要解决的问题2 应用领域3 计算机视觉任务4 视觉任务中遇到的问题5 得分函数6 损失函数7 前向传播整体流程8 返向传播计算方法1 梯度下降9 神经网络整体架构 11 神经元个数对结果的影响12 正则化和激活函数1 正则化2 激活函数 13 神经网络过拟合…

人工智能可解释性(二)(梯度计算,积分梯度等)

目录 1.定义 2.详述 2.1局部解释 可视化方法 梯度计算 2.2积分梯度Integrated Gradients&#xff08;梯度计算进阶&#xff09; 2. 3全局解释 2.3.1Activation Maximization 2.3.2GAN,VAE 2. 4用一个可解释模型解释不可解释模型 2. 4.1LIME 局部解释 参考文献 1.定义 可…

access怎么做进销存?借助access开发进销存管理应用

我不太推荐使用Access&#xff0c;因为他的缺点还是比较明显的&#xff1a; 1、软件自身限制 不能用于互联网&#xff1a;使用Access制作好的管理软件&#xff0c;访问页只能在局域网中使用&#xff1b;只能在Windows上运行&#xff1a;Access仅支持windows的运行环境&#x…

从零开始学习 Java:简单易懂的入门指南之多态(十)

多态&包&final&权限修饰符&代码块 第一章 多态1.1 多态的形式1.2 多态的使用场景1.3 多态的定义和前提1.4 多态的运行特点1.5 多态的弊端1.6 引用类型转换1.6.1 为什么要转型1.6.2 向上转型&#xff08;自动转换&#xff09;1.6.3 向下转型&#xff08;强制转换…

【将回声引入信号中】在语音或音频文件中引入混响或简单回声,以研究回声延迟和回波幅度对生成的回波信号感知的影响(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

积分代换和周期函数

昨晚上看书&#xff0c;有一个稳定随机过程的例题&#xff0c;涉及积分上下限代换、周期函数的微积分性质等知识点。这种题型以前肯定接触过&#xff0c;当下遇到了&#xff0c;思维仍然迷迷糊糊&#xff0c;像是一团乱麻&#xff0c;纠缠不清&#xff0c;照着答案思考了半天&a…

[Blender]Geometry nodes altermesh to UE

首先要先下载插件 AlterMesh – Use geometry nodes inside Unreal 下载对应版本的插件后 打开UE&#xff0c;在对应的设置里面挂上blender.exe的路径 去官方下载一个Blender Geometry nodes 的示例 Demo Files — blender.org​​​​​​

沃罗诺伊图(Voronoi):迷人的世界【1/2】

一、说明 Voronoi图&#xff08;也称为狄利克雷镶嵌或泰森多边形&#xff09;在自然界中无处不在。你已经遇到过他们数千次了&#xff0c;但也许没有这样称呼它。Voronoi图很简单&#xff0c;但它们具有令人难以置信的特性&#xff0c;在制图&#xff0c;生物学&#xff0c;计算…