高性能服务器之Reactor设计

news2024/11/30 0:29:09

                                   

        今天来针对上一节课讲的多路转接知识再进一步进行设计,Reactor是基于epoll的ET模式设计的,在现在的高校和企业中是广泛应用的,今天我们来实现一个简洁版,完整版博主可没那个实力~

目录

基本原理

代码实现 

epoll_server.cc

Accepter.hpp

Reactor.hpp

Service.hpp

Sock.hpp

Util.hpp

Makefile

测试结果


基本原理

        Reactor负责接收事件并关心对应文件描述符读写事件,当epoll中就绪队列有数据时,把事件派发给连接管理器,Accepter调用对应的读写模块。

代码实现 

epoll_server.cc

#include "Reactor.hpp"
#include"Sock.hpp"
#include<stdlib.h>
#include<unistd.h>
#include"Accepter.hpp"
#include"Util.hpp"

static void Usage(std::string proc)
{
    std::cout << "Usage: " << "\n\t" << proc << " port" << std::endl;
}
int main(int argc, char* argv[])
{
    if(argc != 2)
    {
        Usage(argv[0]);
        exit(1);
    }
    //1、创建socket, 监听
    int listen_sock = Sock::Socket();
    SetNonBlock(listen_sock);//listen_sock设置为非阻塞
    Sock::Bind(listen_sock, (uint16_t)atoi(argv[1]));
    Sock::Listen(listen_sock);

    //2、创建Reactor对象
    //Reactor 反应堆模式: 通过多路转接方案, 被动的采用事件派发的方式, 去反向的调用对应的回调函数
    //1. 检测到事件 -- epoll
    //2. 派发事件 - Dispatcher (事件派发 + IO) + 业务处理: 半同步半异步的处理方式
    //3. 链接 -- accepter
    //4. IO -- recver, sender
    Reactor* R = new Reactor();
    R->InitReactor();

    //3、 给Reactor反应堆中加柴火
    //3.1 有柴火
    Event* evp = new Event;
    evp->sock = listen_sock;
    evp->R = R; //指向柴火加的反应堆

    
    //Accepter链接管理器
    evp->RegisterCallBack(Accepter, nullptr, nullptr);

    //3.2 将准备好的柴火放入反应堆Reactor中
    R->InsertEvent(evp, EPOLLIN | EPOLLET); //设置ET模式

    //4. 开始进行事件派发
    for(; ;)
    {
        R->Dispatcher(1000); //事件派发
    }
    return 0;
}

Accepter.hpp

#pragma once

#include<iostream>
#include"Reactor.hpp"
#include"Sock.hpp"
#include"Service.hpp"
#include"Util.hpp"

int Accepter(Event* evp)
{
    std::cout << "有新的链接到来了, 就绪的sock是: " << evp->sock << std::endl;
    //经过这样的死循环,会不断的把链接构建成event结构添加到反应堆中
    while(true)
    {
        int sock = Sock::Accept(evp->sock);
        if(sock < 0)
        {
            std::cout << "Accept Done!" << std::endl;
            break;
        }
        std::cout << "Accept success: " << sock << std::endl;
        SetNonBlock(sock);
        //获取链接成功, IO socket
        Event* other_ev = new Event();
        other_ev->sock = sock;
        other_ev->R = evp->R; //为什么要让所有的Event指向自己所属的Reactor??

        //recver sender error 就是我们代码中的较顶层, 只负责读取!
        other_ev->RegisterCallBack(Recver, Sender, Error);

        //不能同时设置EPOLLOUT和EPOLLIN, 这样服务器会一直死循环,服务器性能大大降低
        evp->R->InsertEvent(other_ev, EPOLLIN|EPOLLET); 
    }
    return 0;
}

Reactor.hpp

#pragma once

#include<iostream>
#include<string>
#include<sys/epoll.h>
#include<unordered_map>
#include<stdlib.h>
#include<unistd.h>


//一般处理IO的时候, 我们只有三种接口需要处理
//处理写入
//处理读取
//处理异常

class Event; //声明
class Reactor;
#define SIZE 128
#define NUM 64

typedef int (*callback_t)(Event* ev); //函数指针类型

//需要让epoll管理的基本结点
class Event
{
public:
    int sock; //对应的文件描述符
    std::string inbuffer;//对应的sock, 对应的输入缓冲区->粘包问题解决
    std::string outbuffer; //对应的sock, 对应的输入缓冲区->epoll发送出去

    //sock设置回调
    callback_t recver;
    callback_t sender;
    callback_t errorer;

    //设置Event回指Reactor的指针
    Reactor* R;
public:
    Event()
    {
        sock = -1;
        recver = nullptr;
        sender = nullptr;
        errorer = nullptr;
        R = nullptr;
    }
    ~Event()
    {}
    //设置回调
    void RegisterCallBack(callback_t _recver, callback_t _sender, callback_t _errorer)
    {
        recver = _recver;
        sender = _sender;
        errorer = _errorer;
    }
};

//不需要关心任何sock的类型(listen, 读, 写)
//如何进行使用该类, 对Event进行管理
class Reactor
{
private:
    int epfd;
    std::unordered_map<int, Event*> events; //我的Epoll类管理的所有的Event的集合
public:
    Reactor()
        :epfd(-1)
    {}
    ~Reactor()
    {}
    void InitReactor()
    {
        epfd = epoll_create(SIZE);
        if(epfd < 0)
        {
            std::cerr << "epoll_create error" << std::endl;
            exit(2);
        }
        std::cout << "InitReactor success" << std::endl;
    }
    //增加
    bool InsertEvent(Event* evp, uint32_t evs)
    {
        //1、将sock中的sock插入到epoll中
        struct epoll_event ev;
        ev.events = evs;
        ev.data.fd = evp->sock;
        if(epoll_ctl(epfd, EPOLL_CTL_ADD, evp->sock, &ev) < 0) //-1失败, 0成功
        {
            std::cerr << "epoll_ctl add event failed" << std::endl;
            return false;
        }
        //2、将ev本身插入到unordered_map中
        events.insert({evp->sock, evp});//evp是维护的一个Event结点
        return true;
    }
    void DeleteEvent(Event* evp)
    {
        int sock = evp->sock;
        auto iter = events.find(sock);
        if(iter != events.end())//找到了
        {
            //1、将sock中的sock从epoll中删除
            epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr);//删除不关心events事件,所以设置成nullptr

            //2. 把特定的ev 从unordered_map中移除
            events.erase(iter);

            //3、close
            close(sock);

            //4、删除event结点 -->它是new出来的, 要手动释放
            delete evp;
        }
    }

    //关于修改, 也是最后看, 使能读写
    bool EnableRW(int sock, bool enbread, bool enbwrite)
    {
        struct epoll_event ev;
        ev.events = EPOLLET | (enbread ? EPOLLIN : 0) | (enbwrite ? EPOLLOUT : 0);
        ev.data.fd = sock;

        if(epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ev) < 0)
        {
            std::cerr << "epoll_ctl mod event failed" << std::endl;
            return false;
        }
        return true;
    }

    //判断sock是否是合法的
    bool IsSockOK(int sock)
    {
        auto iter = events.find(sock);
        return iter != events.end(); //找到就合法
    }

    //就绪事件的派发逻辑
    void Dispatcher(int timeout) //将就绪事件派发给sock
    {
        struct epoll_event revs[NUM];
        int n = epoll_wait(epfd, revs, NUM, timeout);
        for(int i = 0; i < n; i++) //n 表示就绪的fd个数, 维护在了数组里面
        {
            int sock = revs[i].data.fd;
            uint32_t revents = revs[i].events; //本次文件描述符就绪的事件
            //代表差错处理, 将所有的错误问题全部转化成为让IO函数去解决
            if(revents & EPOLLERR)
            {
                revents |= (EPOLLIN | EPOLLOUT); //设置读写
            }
            if(revents & EPOLLHUP) //对端链接关闭
            {
                revents |= (EPOLLIN | EPOLLOUT); //设置读写
            }
            //读数据就绪, 可能有bug,后面解决
            if(revents & EPOLLIN)
            {
                //直接调用回调方法, 执行对应的读取
                if(IsSockOK(sock) && events[sock]->recver) //recever是回调
                {
                    events[sock]->recver(events[sock]);
                }
            }
            if(revents & EPOLLOUT)
            {
                //直接调用回调方法, 执行对应的读取
                if(IsSockOK(sock) && events[sock]->sender)
                {
                    events[sock]->sender(events[sock]);
                }
            }
        }
    }
};

Service.hpp

#pragma once
#include "Reactor.hpp"
#include <vector>
#include <errno.h>
#include"Util.hpp"

#define ONCE_SIZE 128

//1: 本轮读取全部完成
//-1: 读取出错
//0: 对端关闭链接
static int RecverCore(int sock, std::string &inbuffer)
{
    while (true)
    {
        char buffer[ONCE_SIZE];
        ssize_t s = recv(sock, buffer, ONCE_SIZE - 1, 0);
        if (s > 0)
        {
            //读取成功
            buffer[s] = 0;
            inbuffer += buffer;
        }
        else if (s < 0)
        {
            if(errno == EINTR)
            {
                //IO被信号打断, 概率特别低
                continue;
            }
            //errno会被自动设置
            if (errno == EAGAIN || errno == EWOULDBLOCK)
            {
                // 1、读完, 底层没数据了
                return 1; //success
            }
            // 2、真的出错了
            return -1;
        }
        else // s == 0
        {
            return 0;
        }
    }
}

int Recver(Event *evp)
{
    std::cout << "Recver been called" << std::endl;
    // 1、真正的读取
    int result = RecverCore(evp->sock, evp->inbuffer);
    if(result <= 0)
    {
        //差错处理
        if(evp->errorer)
        {
            evp->errorer(evp);
        }
        return -1;
    }
    //1+2X2+3X5+6X
    // 2、分包 -- 一个或者多个报文 -- 解决粘包问题
    std::vector<std::string> tokens;
    std::string sep = "X";
    SplitSegment(evp->inbuffer, &tokens, sep); //从缓冲区读取字节到tokens

    // 3、反序列化 -- 针对一个报文 提取有效参与计算或者存储的信息
    for (auto &seg : tokens)
    {
        std::string data1, data2;
        if(Deserialize(seg, &data1, &data2)) //就是和业务强相关了
        {
            // 4、业务逻辑 -- 得到结果
            int x = atoi(data1.c_str());
            int y = atoi(data2.c_str());
            int z = x + y;
            // 5、构建响应 -- 添加到evp->outbuffer!!
            // 1+2X ---> 1+2=3X
            std::string res = data1;
            res += "+";
            res += data2;
            res += "=";
            res += std::to_string(z);
            res += sep;

            //send?? 不能直接send!!!
            evp->outbuffer += res; //把数据写入自己维护的缓冲区
        }
    }

    // 6、尝试直接/间接进行发送 -- 后续说明
    //必须条件成熟了(写事件就绪), 你才能发送呢??
    //一般只要将报文处理完毕, 才需要发送
    //写事件一般都是就绪的, 但是用户不一定是就绪的!
    //对于写事件, 我们通常是按需设置!!
    if(!(evp->outbuffer).empty())
    {
        //写打开的时候, 默认就是就绪的, 即便是发送缓冲区已经满了
        //epoll 只要用户重新设置了OUT事件, EPOLLOUT至少会触发一次
        evp->R->EnableRW(evp->sock, true, true); //读写使能开启
    }
    return 0;
}

//1: 数据全部发完
//0: 数据没有发完, 但是不能再发了
//-1:发送失败
int SenderCore(int sock, std::string& outbuffer)
{
    while(true)
    {
        int total = 0; //本轮累计发送的数据量
        const char* start = outbuffer.c_str();
        int size = outbuffer.size();
        ssize_t curr = send(sock, start + total, size - total, 0);
        if(curr > 0)
        {
            total += curr;
            if(total == size)
            {
                //全部将数据发送完成
                outbuffer.clear();
                return 1;
            }
        }
        else
        {
            //数据没有发送完成, 但是不能在发送了
            if(errno == EINTR) //IO信号中断
            {
                continue;
            }
            if(errno == EAGAIN || errno == EWOULDBLOCK) //发送缓冲区满了
            {
                outbuffer.erase(0, total);
                return 0;
            }
            return -1;
        }
    }
}
int Sender(Event *evp)
{
    std::cout << "Sender been called" << std::endl;
    //1: 数据全部发完
    //0: 数据没有发完, 但是不能再发了
    //-1:发送失败
    int result = SenderCore(evp->sock, evp->outbuffer);
    if(result == 1)
    {
        evp->R->EnableRW(evp->sock, true, false); //按需设置
    }
    else if(result == 0)
    {
        //可以什么也不做
        evp->R->EnableRW(evp->sock, true, false);
    }
    else
    {
        if(evp->errorer)
        {
            evp->errorer(evp); //errorer统一处理差错
        }
    }
    return 0;
}

int Error(Event *evp)
{
    std::cout << "Error been called" << std::endl;
    evp->R->DeleteEvent(evp);
    return 0;
}

Sock.hpp

#pragma once
#include <iostream>
#include <string>
#include <string.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

using namespace std;

class Sock
{
public:
    static int Socket()
    {
        int sock = socket(AF_INET, SOCK_STREAM, 0); // UDP
        if (sock < 0)
        {
            cerr << "socket err" << endl;
            exit(2);
        }
        return sock;
    }
    static void Bind(int sock, uint16_t port)
    {
        struct sockaddr_in local;
        memset(&local, 0, sizeof(local));
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        local.sin_addr.s_addr = INADDR_ANY; //服务端,ip地址

        if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
        {
            cerr << "bind error" << endl;
            exit(3);
        }
    }

    static void Listen(int sock)
    {
        if (listen(sock, 5) < 0)
        {
            cerr << "listen error" << endl;
            exit(4);
        }
    }

    static int Accept(int sock)
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        int fd = accept(sock, (struct sockaddr *)&peer, &len);
        if(fd >= 0)
        {
            return fd;
        }
        return -1;
    }

    static void Connect(int sock, string ip, uint16_t port)
    {
        struct sockaddr_in server;
        memset(&server, 0, sizeof(server));

        server.sin_family = AF_INET;
        server.sin_port = htons(port);
        server.sin_addr.s_addr = inet_addr(ip.c_str());//字符串->整型,大小端解决了
        //inet_ntoa 整型->字符串

        if(connect(sock, (struct sockaddr*)&server, sizeof(server)) == 0)
        {
            cout << "Connect Success" << endl;
        }
        else
        {
            cout << "Connect Failed" << endl;
            exit(5); 
        }
    }
};

Util.hpp

#pragma once
#include<iostream>
#include<unistd.h>
#include<fcntl.h>


//工具类
//设置一个sock成为非阻塞
void SetNonBlock(int sock)
{
    int f1 = fcntl(sock, F_GETFL);
    if(f1 < 0)
    {
        std::cerr << "fcntl failed" << std::endl;
        return;
    }
    fcntl(sock, F_SETFL, f1|O_NONBLOCK);//设置非阻塞
}

//1+2X2+3X5+6X
void SplitSegment(std::string& inbuffer, std::vector<std::string>* tokens, std::string sep)
{
    while(true)
    {
        std::cout << "inbuffer: " << inbuffer << std::endl; //查看缓冲区里还有什么
        auto pos = inbuffer.find(sep);
        if(pos == std::string::npos) //没有找到
        {
            break;
        }
        std::string sub = inbuffer.substr(0, pos); //[ )
        tokens->push_back(sub);
        inbuffer.erase(0, pos + sep.size()); //从0开始, 移除pos+sep.size()个
    }
}
bool Deserialize(const std::string& seg, std::string* out1, std::string* out2) //就是和业务强相关
{
    //1+2
    std::string op = "+";
    auto pos = seg.find("+");
    if(pos == std::string::npos) //没找到
    {
        return false;
    }
    *out1 = seg.substr(0, pos);
    *out2 = seg.substr(pos+op.size());
    return true;
}

Makefile

epoll_server:epoll_server.cc
	g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
	rm -f epoll_server

测试结果

 看到这里, 给博主点个赞吧~

                  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/3207.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

当面试官让我回答React和Vue框架的区别......

我们为什么需要错误边界 在React组件中可能会由于某些JavaScript错误&#xff0c;导致一些无法追踪的错误&#xff0c;导致应用崩溃。部分 UI 的 JavaScript 错误不应该导致整个应用崩溃。为此&#xff0c;React引入了错误边界(Error Boundary)的概念&#xff1a;可以捕获发生…

MySQL搭建主从复制流程及相关问题

目录一、关于主从复制1.1 关于主从复制1.2 应用场景1.3 优缺点1.4 原理二、配置主从复制2.1 同步各个服务器的时间2.2 修改主库&#xff08;M1&#xff09;配置2.3 主库&#xff08;M1&#xff09;为从库&#xff08;S1\S2&#xff09;增加账号2.3 查看主库&#xff08;M1&…

欢迎女神科学家颜宁回国,并祝她如愿以偿

目录1、女神科学家颜宁是谁2、颜宁在深圳人才论坛最新演讲&#xff0c;以及招聘邮箱3、颜宁微博回应4、结论与展望最近女神科学家颜宁回国了&#xff0c;整个科学界和中国都沸腾了&#xff0c;也上了热搜&#xff0c;成了热门话题&#xff0c;越来越多的海归精英选择回国 1、…

Python 和Java 哪个更适合做自动化测试?

很多小伙伴在功能测试行业工作了2、3年后&#xff0c;发现自己已经把功能测试做的非常好了&#xff0c;已经到职业发展和薪资发展的瓶颈期了&#xff0c;就想着学点东西&#xff0c;提升一下技能。 而对于功能测试升级来说&#xff0c;一般有这么3个主流的发展方向&#xff1a;…

事件/边沿检测--上升沿检测、下降沿检测

检测上升沿&#xff1a;&#xff08;从低到高的跳变 __| ) input sig_a; reg sig_a_d1; wire sig_a_risedge; alaways (posedge clk or negedge rstb) begin if(!rstb) sig_a_d1 < 1b0; else sig_a_d1 < sig_a; end assign sig_a_risedge sig_a & !sig_a_d1; …

【02】概率图模型在真实世界中的应用案例

概率图模型在真实世界中的应用案例 概率图模型有许多不同的实际应用。 为了激起大家对概率图模型的兴趣&#xff0c;也为了让大家能够对概率图模型有感性的认知&#xff0c;本章我会分享概率图模型的诸多实际应用案例。 文章目录图像中的概率模型图像生成图像修复图像降噪语言…

【Python百日进阶-WEB开发】Day171 - Django案例:03配置工程日志

文章目录八、配置工程日志8.1 目的和原因8.2 配置工程日志的步骤8.2.1 配置工程日志8.2.2 准备日志文件目录8.2.3 日志器记录器的使用8.2.4 Git管理工程日志九、配置前端静态文件9.1 准备静态文件9.2 指定静态文件的加载路径十、相关文档八、配置工程日志 8.1 目的和原因 目的…

什么是跨域?以及解决方案

现在的web项目&#xff0c;很多都是前后端分离&#xff0c;特别容易出现跨域问题 那么什么是跨域问题呢?本篇文章带你彻底从本质上弄明白什么是跨域问题以及如何解决 一、跨域有什么现象 首先我们看一下现象&#xff0c;如何出现的跨域问题。例&#xff1a; 前段&#xff1a…

【Linux】基础IO —— 上

&#x1f387;Linux&#xff1a;基础IO详解 博客主页&#xff1a;一起去看日落吗分享博主的在Linux中学习到的知识和遇到的问题博主的能力有限&#xff0c;出现错误希望大家不吝赐教分享给大家一句我很喜欢的话&#xff1a; 看似不起波澜的日复一日&#xff0c;一定会在某一天让…

Web渗透测试攻防之浅述信息收集

前言 众所周知渗透测试的本质是信息收集&#xff0c;在渗透测试中信息收集的质量直接关系到渗透测试成果的与否。在对系统进行渗透测试前的信息收集是通过各种方式获取所需要的信息&#xff0c;收集的信息越多对目标进行渗透的优势越有利。通过利用获取到的信息对系统进行渗透…

Java Spring Cloud XVIII 之 Kafka I

Java Spring Cloud XVIII 之 Kafka I Kafka 1.Kafka简介 Kafka是由Apache软件基金会开发的一个开源流处理平台&#xff0c;由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。Kafka最初是由LinkedIn开发&#xff0c;并随后于2011年初开源…

C++ 类和对象 (中)

作者&#xff1a;小萌新 专栏&#xff1a;C初阶 作者简介&#xff1a;大二学生 希望能和大家一起进步 本篇博客目标&#xff1a;梳理自己六个小时学到的知识 并且将类和对象知识分享给大家 专注的去做一件事 如果累了就去休息 C 类和对象 中本章学习目标前言一. 构造函数1.1 概…

破解系统密码与重装windows系统

数据来源 一、利用5次shift漏洞破解win7密码 1.1 漏洞 1. 在未登录时&#xff0c;连续按5次shift键&#xff0c;弹出程序C:\Windows\System32\sethc.exe 2. 部分win7及win10系统在未进入系统时&#xff0c;可以通过系统修复漏洞篡改系统文件名&#xff01; 注意&#xff1a;…

使用Maven部署到远程Linux服务器Tomcat

一、安装JDK 首先给服务器安装jdk&#xff0c;访问官网下载&#xff1a;Java Downloads | Oracle&#xff0c;下载图中的版本。首先我使用的是tomcat10&#xff0c;最低支持jdk1.8。安装了jdk19&#xff0c;是当时的最新版实测tomcat开启失败&#xff0c;新版jdk也不自带jre&a…

Windows上使用QEMU创建aarch64(ARM64)虚拟机

前言 随着国产化的推进&#xff0c;现在采用ARM、MIPS的机器越来越多&#xff0c;作为开发、运维人员要调测软件总不能每种架构的机器都去买一台吧&#xff1f;主要像博主这样的穷B&#xff0c;实在也是承受不起。。 需要的工具 1、QEMU Windows版官网下载地址&#xff1a;…

软件测试最最最重要的事

软件测试用例得出软件测试用例的内容&#xff0c;其次&#xff0c;按照软件测试写作方法&#xff0c;落实到文档中&#xff0c;两者是形式和内容的关系&#xff0c;好的测试用例不仅方便自己和别人查看&#xff0c;而且能帮助设计的时候考虑的更周。 一个好的测试用例必须包含…

Articulate360在线学习课件制作工具

Articulate是一款全新理念的在线和移动学习课件制作工具&#xff0c;可以说是目前国际上用户最广泛的e-learning课件制作工具之一。它包含了全新版的Storyline 360和Rise 360以及大量其他创作应用程序。使用Storyline 360开发可在所有设备上运行的自定义交互式课程&#xff0c;…

Java日志框架的发展历史,你不想了解一下吗

前言 相信大家在项目开发中肯定遇到过log4j&#xff0c;JUL&#xff0c;slf4j&#xff0c;logback&#xff0c;log4j2等日志框架相关名词&#xff0c;这些日志框架之间到底有什么关系&#xff0c;Java日志框架究竟经历了什么样的发展历程&#xff0c;相信有很多人都对此充满了好…

Socket 编程基础

文章目录一、socket 简介二、socket 编程接口介绍1. socket()函数2. bind()函数3. listen()函数4. accept()函数5. connect()函数6. 发送和接收函数read()函数recv()函数write()函数send()函数7. close()关闭套接字三、IP 地址格式转换函数inet_pton()函数inet_ntop()函数本篇会…

决策树与随机森林在分类预测中的应用(附源码)

写在前面 今天给大家分享一下基于决策树和随机森林在乳腺癌分类中实战。决策树和随机森林是白盒模型&#xff0c;数学建模中常用到这两种模型&#xff0c;用于预测或分类&#xff0c;随机森林还可以进行特征选择&#xff0c;故很推荐大家学习&#xff01;&#xff01;&#xff…