【项目】Reactor模式的服务器

news2025/1/15 19:58:27

目录

Reactor完整代码连接

前置知识:

 1.普通的epoll读写有什么问题?

2.Connection内的回调函数是什么

3.服务器的初始化(Connection只是使用的一个结构体)

4.等待就绪事件:有事件就绪,对使用Connection的不同对象(封装fd,回调方法)调用对应的回调方法;

5._listenSock读取 :Accepter函数获取新链接怎么处理的

6.普通套接字读取:Recver

7.对写事件的关心是按需关系的:

8.执行效果:

9.Reactor的优势:


Reactor完整代码连接

前置知识:

Reactor叫做反应堆模式;反应:对已经就绪的事件(读、写、异常)进行处理;

我们使用epoll来实现,select、poll、epoll是多路转接的发展史,epoll完善了select、poll的缺点;

  • 需要程序员维护数组                                      select/poll都有这个缺点    
  • 有大量的遍历                                                 select/poll都有这个缺点 
  • 大量参数为输入输出型参数,需要重新设置  select有
  • 管理的fd有上限                                              select有

 1.普通的epoll读写有什么问题?

  • 使用的是一个静态数组读取;报文长,读到就是不完整报文,报文短,一次读取可能有多个报文,最后一个报文可能不是完整的;
  • 这样的错误报文没法分析和处理,就不能构造响应报文;

综上所述:问题为没法保证读取到的是完整报文,导致没法分析和处理、构建响应报文;

    void Read(int fd)
    {
        char buff[1024];
        ssize_t s = read(fd, buff, 1023);
        if(s > 0){
            buff[s] = 0;
            LOG2(INFO, buff, fd);
        }

解决办法:将文件描述符封装,并且有接受发送缓冲区,使用string就可以;

  1. 使用静态数组读取,然后添加到接受缓冲区保存;
  2. 读取完毕,对接受缓冲区分析是否有完整报文;
  3. 处理完整请求报文,构建响应报文添加到发送缓冲区;
using func_t = std::function<void(Connection*)>;
using cals_t = std::function<void(std::string &, Connection*)>;

class Connection{
public:
    Connection(int fd = -1 ):_fd(fd), _ts(nullptr)
    {}
    ~Connection()
    {
        if(_fd >= 0)
            close(_fd);
    }
public:
    int _fd;

    //读写异常回调方法
    func_t _recver;
    func_t _sender;
    func_t _exception;

    //接受缓冲区
    std::string _outbuff;
    //发送缓冲区
    std::string _inbuff;

    //TcpServer的回指指针,对写事件的关心是按需打开
    TcpServer *_ts;
    
    //连接最近活跃活动的时间
    time_t _times;
};

2.Connection内的回调函数是什么

一个包装器;返回值为void,参数为Connection*,的函数指针、仿函数、lamada表达,它都可以接受;

using func_t = std::function<void(Connection*)>;

优势:

  • _listenSock是读方法是接受新连接,普通是读取请求报文
  • 初始化时设置读写异常回调方法(回调:使用函数指针执行的函数),不需要判断是_listenSock还是普通套接字,统一使用con->_recver;

3.服务器的初始化(Connection只是使用的一个结构体)

  1. 套接字创建,bind,监听;
  2. 构建epoll模型,epoll函数也封装了,_epfd封装在Epoll类内;
  3. 初始化_listenSock    Connection结构体;读取回调方法Accept是类函数,多一个this指针,需要使用std::bind来改变参数个数,进行传递给包装器
  4. epoll_wait的事件就绪队列初始化;
class TcpServer{
    const static int gport = 8080;
    const static int gnum = 128;
    TcpServer(int port = gport, int num = gnum):_port(gport), _evts_num(num)
    {
        //套接字,创建bind监听
        _listenSock = Sock::Socket();
        Sock::Bind(_listenSock,_port);
        Sock::Listen(_listenSock);

        //构建epoll模型
        _epoll.CreateEpoll();

        //listensock添加epoll模型和_connections管理起来
        AddConnection(_listenSock, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);

        //epoll_wait就绪队列,获取已就绪的事件
        _evts = new epoll_event[_evts_num];
    }
    ~TcpServer()
    {
        if(_listenSock >= 0)
            close(_listenSock);
        if(_evts != nullptr)
            delete[] _evts;
        for(auto &pr : _connections)
        {
            _connections.erase(pr.first);
            delete pr.second;
        }
    }
private:
    int _listenSock;

    //epoll
    Epoll _epoll;
    //就绪队列
    epoll_event* _evts;
    int _evts_num;

    //管理connection对象
    std::unordered_map<int, Connection*> _connections;
    int _port;

    //业务处理的回调指针
    cals_t _cb;
};

4.等待就绪事件:有事件就绪,对使用Connection的不同对象(封装fd,回调方法)调用对应的回调方法;

    void LoopOnce()
    {
        int n = _epoll.WaitEpoll(_evts, _evts_num);
        //有事件就绪
        if(n > 0)
        {
            //LOG2(INFO, "epoll wait success",fd);
            for(int i=0; i<n; i++)
            {
                int fd = _evts[i].data.fd;
                int events = _evts->events;

                //连接关闭或者错误,改为读写统一处理,读写失败调异常处理;
                if( events & EPOLLHUP){
                    LOG2(INFO,"连接关闭",fd);
                    events |= (EPOLLIN | EPOLLOUT);
                }
                if( events & EPOLLERR){
                    LOG2(INFO,"错误",fd);
                    events |= (EPOLLIN | EPOLLOUT);
                } 

                if(_connections.count(fd) && events & EPOLLIN)
                {
                    if(IsConnectionExits(fd) && _connections[fd]->_recver != nullptr){
                        _connections[fd]->_recver(_connections[fd]);
                    }
                }
                if(_connections.count(fd) && events & EPOLLOUT)
                {
                    if(IsConnectionExits(fd) && _connections[fd]->_sender != nullptr)
                        _connections[fd]->_sender(_connections[fd]);
                }
            }
        }
        else if(n == 0)
        {
            LOG(INFO, "timeout");
        }
        else
        {
            LOG(FATAL,"epoll wait fail");
            exit(4);
        }
    }
    void Dispatcher(cals_t cb)
    {
        _cb = cb;
        while(true)
        {
            //去除不活跃的连接
            DeleteInactivity();
            LoopOnce();
        }
    }

5._listenSock读取 :Accepter函数获取新链接怎么处理的

  • 得到新连接,如果新的fd是合法的,设置对应的读写异常回调方法,读:读取请求报文,写:发送响应报文,异常:出现错误进行处理;
  • 所有的套接字都是使用ET模式(通知效率高,只支持非阻塞读写),EPOLLET因该被设置;
void Accepter(Connection * con)
    {
        while(true)
        {
            con->_times = time(nullptr);
            struct sockaddr_in tmp;
            socklen_t tlen = sizeof(tmp);
            int new_sock = accept(con->_fd, (struct sockaddr *)&tmp, &tlen);
            if(new_sock < 0)
            {
                //所以事件处理完毕
                if(errno == EAGAIN || errno == EWOULDBLOCK)
                    break;
                else if(errno == EINTR)//可能被信号中断,概率极小
                    continue;
                else
                {
                    std::cout << "accept fail , errno :" << errno << strerror(errno) << std::endl;
                    break;
                }
            } 
            else//添加到epoll模型和_connections中管理;
            {
                if(AddConnection(new_sock, std::bind(&TcpServer::Recver, this, std::placeholders::_1), 
                                std::bind(&TcpServer::Sender, this, std::placeholders::_1),
                                std::bind(&TcpServer::Exception, this, std::placeholders::_1)
                                ))
                    LOG2(INFO, "add connection success",new_sock);
                else
                    LOG2(RISK, "add connection fail", new_sock);
            }
        }
    }

6.普通套接字读取:Recver

  1. 一直读取,直到错误或者读取完毕;每次读取的结果都放到接受缓冲区;
  2. 读取完毕,对接受缓冲区处理,拿出一个个完整的报文,对请求报文进行业务处理;
    void Recver(Connection *con)
    {
        con->_times = time(nullptr);
        const int buff_size = 1024;
        char buff[buff_size];
        while(true)
        {
            ssize_t s = recv(con->_fd, buff, buff_size - 1, 0);
            if(s > 0)
            {
                buff[s] = 0;
                con->_outbuff += buff;
            }
            else if(s == 0)
            {
                LOG2(INFO, "写端关闭", con->_fd);
                con->_exception(con);
                return;
            }
            else
            {
                //读取完毕
                if(errno == EAGAIN || errno == EWOULDBLOCK )
                {
                    LOG2(INFO, "处理完毕", con->_fd);
                    break;
                }
                else if(errno == EINTR)
                    continue;
                else
                {
                    LOG2(ERROR, "recv fail ,fd: ", con->_fd);
                    con->_exception(con);
                    return;
                }
            }
        }
        std::cout << "fd: " << con->_fd << "outbuff: " << con->_outbuff <<std::endl;
        //对outbuff内的完整报文,进行处理
        std::vector<std::string> out;
        //分隔报文,函数在protocol.hpp
        SplitMessage(out, con->_outbuff);
        for(auto &s : out)
            _cb(s, con);//业务逻辑回调指针,在主函数

    }

7.对写事件的关心是按需关系的:

  • 如果开启关心,还没有数据发送,写事件会一直就绪;所以按需关心;
  • 请求报文业务处理完毕,构建好响应报文,一定有响应,打开对写事件的关心;
  • 也是Connection为什么封装一个Tcperver指针的原因,这里开启写事件的关心;
//业务处理
void CalArguments(std::string &str, Connection *con)
{
    //请求报文反序列化
    Request req;
    //std::cout<<str <<std::endl;
    if(!req.Deserialize(str)){
        LOG2(ERROR, "deseroalize fail" ,con->_fd);
        return;
    }

    //对数据处理
    Response res;
    calculator(req, res);

    //响应报文序列化
    std::string s = res.Serialize();

    //添加到inbuff
    con->_inbuff += s;

    //一定有响应报文,打开写事件的关系
    con->_ts->EnableReadWrite(con->_fd, true, true);
}

8.执行效果:

  • 我写的协议是对任意两个数加减乘除;
  • 每个请求或者响应都是用 x  做为分割符的;

9.Reactor的优势:

和进程/线程做比较:

  • 它是一个单进程的就可以并发处理请求的服务器,比进程/线程减少了创建、销毁、调度的时间;
  • 它的等待一批fd,减少了单位等待时间;一个线程等待对应一个fd;
  • 有很高的复用性,替换业务逻辑就行了;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/952362.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MATLAB实现AHP层次分析法——以情人节选取礼物为例

问题背景&#xff1a; 情人节来临之际&#xff0c;广大直男&#xff08;女&#xff09;同胞在给异性朋友选购礼物时会遇到难题——什么才是礼物好坏最重要的标准&#xff1f;基于层次分析法AHP进行计算&#xff0c;得出最高权重的指标&#xff0c;给出各位朋友选购礼物的一种思…

Vector<T> 动态数组(模板语法)

C数据结构与算法 目录 本文前驱课程 1 C自学精简教程 目录(必读) 2 动态数组 Vector&#xff08;难度1&#xff09; 其中&#xff0c;2 是 1 中的一个作业。2 中详细讲解了动态数组实现的基本原理。 本文目标 1 学会写基本的C类模板语法&#xff1b; 2 为以后熟练使用 S…

java.lang.classnotfoundexception: com.android.tools.lint.client.api.vendor

Unity Android studio打包报错修复 解决方式 java.lang.classnotfoundexception: com.android.tools.lint.client.api.vendor 解决方式 在 launcherTemplate 目录下找到 Android/lintOptions 选项 加上 checkReleaseBuilds false lintOptions { abortOnError false checkRelea…

以GitFlow分支模型为基准的Git版本分支管理流程

以GitFlow分支模型为基准的Git版本分支管理流程 文章目录 以GitFlow分支模型为基准的Git版本分支管理流程GitFlow分支模型中的主要概念GitFlow的分支管理流程图版本号说明借助插件Git Flow Integration Plus实现分支模型管理其他模型TBD模型阿里AoneFlow模型 GitFlow分支模型中…

设计模式的使用——建造者模式+适配器模式

项目代码地址 一、需求介绍 现公司数据库有一张表中的数据&#xff0c;需要通过外部接口将数据推送到别人的系统中。现有的问题是&#xff1a; 数据字段太多&#xff0c;而且双方系统实体字段不一致&#xff0c;每次都要通过get、set方法去对数据取值然后重新赋值。如果后期需…

使用php实现微信登录其实并不难,可以简单地分为三步进行

使用php实现微信登录其实并不难&#xff0c;可以简单地分为三步进行。 第一步&#xff1a;用户同意授权&#xff0c;获取code //微信登录public function wxlogin(){$appid "";$secret "";$str"http://***.***.com/getToken";$redirect_uriu…

家政电子邮件营销怎么做?邮件营销的方案?

家政电子邮件营销的作用&#xff1f;企业如何利用营销邮件拓客&#xff1f; 随着科技的不断发展&#xff0c;家政服务行业也逐渐融入了电子邮件营销的方式&#xff0c;这为家政企业提供了与客户更紧密互动的机会。在本文中&#xff0c;我们将探讨家政电子邮件营销的几个关键步…

OLED透明屏显示技术:未来显示科技的领航者

OLED透明屏显示技术是一种创新性的显示技术&#xff0c;它的特殊性质使其成为未来显示科技的领航者。 OLED透明屏具有高对比度、快速响应时间、广视角和低功耗等优势&#xff0c;同时&#xff0c;其透明度、柔性和薄型设计使其成为创新设计的理想选择。 本文将深入探讨OLED透…

从零做软件开发项目系列之九——项目结项

前言 一个项目&#xff0c;经过前期的需求调研分析&#xff0c;软件设计&#xff0c;程序开发&#xff0c;软件测试、系统部署、试运行系统调试等过程&#xff0c;最后到了项目的验收阶段&#xff0c;也就是项目生命周期的最后一个阶段&#xff0c;即项目结项&#xff0c;它涉…

什么是浏览器缓存(browser caching)?如何使用HTTP头来控制缓存?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 浏览器缓存和HTTP头控制缓存⭐ HTTP头控制缓存1. Cache-Control2. Expires3. Last-Modified 和 If-Modified-Since4. ETag 和 If-None-Match ⭐ 缓存策略⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击…

什么是同源策略(same-origin policy)?它对AJAX有什么影响?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 同源策略&#xff08;Same-Origin Policy&#xff09;与 AJAX 影响⭐ 同源策略的限制⭐ AJAX 请求受同源策略影响⭐ 跨域资源共享&#xff08;CORS&#xff09;⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记…

rrpc:实现熔断与限流

一、服务端的自我保护&#xff08;实现限流&#xff09; 为什么需要限流器&#xff1f; 我们先看服务端&#xff0c;举个例子&#xff0c;假如我们要发布一个 Rrpc 服务&#xff0c;作为服务端接收调用端发送过来的请求&#xff0c;这时服务端的某个节点负载压力过高了&#x…

ARM DIY(五)摄像头调试

前言 今天&#xff0c;就着摄像头的调试&#xff0c;从嵌入式工程师的角度&#xff0c;介绍如何从无到有&#xff0c;一步一步地调出一款设备。 摄像头型号&#xff1a;OV2640 开发步骤 分为 2 个阶段 5 个步骤 阶段一&#xff1a; 设备树、驱动、硬件 阶段二&#xff1a; 应…

多线程使用HashMap,HashMap和HashTable和ConcurrentHashMap区别(面试题常考),硬盘IO,顺便回顾volatile

一、回顾&#x1f49b; 谈谈volatile关键字用法 volatile能够保证内存可见性&#xff0c;会强制从主内存中读取数据&#xff0c;此时如果其他线程修改被volatile修饰的变量&#xff0c;可以第一时间读取到最新的值。 二、&#x1f499; HashMap线程不安全没有锁,HashTable线程…

二分搜索树(Java 实例代码)

目录 二分搜索树 一、概念及其介绍 二、适用说明 三、二分查找法过程图示 四、Java 实例代码 src/runoob/binary/BinarySearch.java 文件代码&#xff1a; 二分搜索树 一、概念及其介绍 二分搜索树&#xff08;英语&#xff1a;Binary Search Tree&#xff09;&#xff…

20.液体加载特效

效果 源码 <!doctype html> <html> <head><meta charset="utf-8"><title>Milk | Liquid Loader Animation</title><link rel="stylesheet" href="style.css"> </head> <body><div cl…

【Flutter】Flutter 使用 Equatable 简化对象比较

【Flutter】Flutter 使用 Equatable 简化对象比较 文章目录 一、前言二、Equatable 简介三、为什么需要 Equatable&#xff1f;四、如何使用 Equatable五、Equatable 的其他特性六、完整的业务代码示例七、总结 一、前言 在 Flutter 开发中&#xff0c;我们经常需要比较对象的…

Python基础以及代码

Python基础以及代码 1.第一个代码如下&#xff1a; # 项目&#xff1a;第一个项目 # 作者&#xff1a;Adair # 开放时间&#xff1a; 2023/8/15 21:52print("Hello,world!!")如图所示&#xff1a; 2.数字的代码如下&#xff1a; # 项目&#xff1a;演示第一个项…

什么是Web组件(Web Components)?它们有哪些主要特点?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ Web 组件&#xff08;Web Components&#xff09;⭐ Web 组件的主要特点1. 自定义元素&#xff08;Custom Elements&#xff09;2. Shadow DOM3. HTML 模板4. 封装性和重用性5. 生态系统6. 跨框架兼容性 ⭐ 写在最后 ⭐ 专栏简介 前端入门…

新能源技术是实现碳达峰碳中和的必然路径

在绿色经济发展的时代背景之下&#xff0c;光伏屋顶瓦顺势而生。集发电、环保功能于一体的光伏屋顶瓦&#xff0c;让每一栋建筑都能成为一座绿色发电站&#xff0c;实现建筑用电自给&#xff0c;有效降低建筑能耗&#xff0c;极大的推动生态建筑和生态城市的发展。 太阳能光伏瓦…