Reactor的概念

news2024/12/24 20:21:12

一、Reactor的概念

​ Reactor模式是一种事件驱动模式,由一个或多个并发输入源(input),一个消息分发处理器(Initiation Dispatcher),以及每个消息对应的处理器(Request Handler)构成。

​ Reactor在结构上类似于 消费者模式 ,但Reactor与消费者模式不同的是,它并没有像消费者模式一样使用Queue来做缓存,而是当一个Event 输入到 Initiation Dispatcher 之后,该Initiation Dispatcher 会根据 Event 的类型 ,分发给对应的 Request Handler 。
在这里插入图片描述

二、Reactor的优缺点

优点
​ 在Reactor An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events中是这么说的:

​ 1.Separation of concerns: The Reactor pattern decouples application-independent demultiplexing and dispatching mechanisms from application-specific hook method functionality. The application-independent mechanisms become reusable components that know how to demultiplex events and dispatch the appropriate hook methods defined by Event Handlers. In contrast, the application-specific functionality in a hook method knows how to perform a particular type of service.

​ 2.Improve modularity, reusability, and configurability of event-driven applications: The pattern decouples application functionality into separate classes. For instance, there are two separate classes in the logging server: one for establishing connections and another for receiving and processing logging records. This decoupling enables the reuse of the connection establishment class for different types of connection-oriented services (such as file transfer, remote login, and video-on-demand). Therefore, modifying or extending the functionality of the logging server only affects the implementation of the logging handler class.

​ 3.Improves application portability: The Initiation Dispatcher’s interface can be reused independently of the OS system calls that perform event demultiplexing. These system calls detect and report the occurrence of one or more events that may occur simultaneously on multiple sources of events. Common sources of events may in- clude I/O handles, timers, and synchronization objects. On UNIX platforms, the event demultiplexing system calls are called select and poll [1]. In the Win32 API [16], theWaitForMultipleObjects system call performs event demultiplexing.

​ 4.Provides coarse-grained concurrency control: The Reactor pattern serializes the invocation of event handlers at the level of event demultiplexing and dispatching within a process or thread. Serialization at the Initiation Dispatcher level often eliminates the need for more complicated synchronization or locking within an application process.

注:上面这些我并没有详细去了解是否是真的这么高效,但我觉得其中最有用的是 解决线程的切换、同步、数据的移动会引起性能问题。也就是说从性能的角度上,它最大的提升就是减少了性能的使用,即不需要每个Client对应一个线程。

缺点
​ 1.相比传统的简单模型,Reactor增加了一定的复杂性,因而有一定的门槛,并且不易于调试。

​ 2.Reactor模式在IO读写数据时还是在同一个线程中实现的,即使使用多个Reactor机制的情况下,那些共享一个Reactor的Channel如果出现一个长时间的数据读写,会影响这个Reactor中其他Channel的相应时间,比如在大文件传输时,IO操作就会影响其他Client的相应时间,因而对这种操作,使用传统的Thread-Per-Connection或许是一个更好的选择,或则此时使用Proactor模式。

三、Poco Reactor模式

​ 根据上面对 Reactor 的了解,Poco也根据Reactor模式封装了 SocketReactor 。

SocketReactor

​ 在Poco的 Reactor模式中,SocketReactor 充当着 Initiation Dispatcher 的角色,根据Event事件,分发至对应的Handler中执行。

​ 根据调用 addEventHandler ,向Reactor 中,指定socket, 添加 监听的 EventNotification,同时添加 处理该Event的方法 。

​ 调用removeEventHandler 以移除之前添加的监听。

​ 通过 setTimeout 去设置 I/O select 的超时时间,当超过指定时间内都没有收到消息时,则判定为超时,停止 select 方法。

SocketNotification

​ 在 addEventHandler 中,需要设置 EventNotification ,其中EventNotification 指的是 SocketNotification ,Poco的Notification包含以下:

ReadableNotification:

监听 socket 是否变为可读的通知

WritableNotification:

监听 socket 是否在写入的通知

ErrorNotification:

监听 socket 是否发生错误的通知

TimeoutNotification:

在所有的event处理方法中都会注册该监听通知,实现该通知的方法必须重写 onTimeout 方法去实现超时的处理。

IdleNotification:

当 SocketReactor 中没有 socket 进行 Socket :: select() ,则触发事件

ShutdownNotification:

监听socket关闭的通知

关系图

在这里插入图片描述

reactor的简单实现代码

//Header File
#include <Poco/Runnable.h>
#include <Poco/Net/SocketReactor.h>
#include "Poco/Net/SocketNotification.h"
#include <Poco/Net/WebSocket.h>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Net/ServerSocket.h>
class ReactorTest : public Poco::Runnable
{
public:
    ReactorTest(string host, uint32_t port);
    ~ReactorTest();

    bool start();
    bool stop();

protected:
	// override from Runnable
    void run();
    // override from SocketReactor
    void onReadable(Poco::Net::ReadableNotification* pNf);
    void onTimeout(Poco::Net::TimeoutNotification* pTf);

private:
    WebSocket*  _ws;

    char _buffer[1024];
    int _flags;
    int _reBit;
    std::string _payload;
    string _host;
    uint32_t _port;

    Poco::Thread _thread;
    std::unique_ptr<Poco::Net::SocketReactor> _reactor;
    std::unique_ptr<Poco::Timestamp> _timer;
    Poco::Timestamp _time;

    const int _reactorTimeout = 50000; //50ms check ticket
 };
//Cpp File
#include "ReactorTest.h"
#include "Poco/Observer.h"
ReactorTest::ReactorTest(string host, uint32_t port) :_host(host), _port(port)
{

}

ReactorTest::~ReactorTest()
{
    stop();
}

bool ReactorTest::start()
{
    try
    {
        const std::string urlPrefix("ws://");   //Need to change to TLS soon after
        Poco::URI  restful_host(urlPrefix + _host);
        HTTPClientSession cs(restful_host.getHost(), _port);
        HTTPRequest request(HTTPRequest::HTTP_GET, "/", "HTTP/1.1");
        HTTPResponse response;
        _ws = new WebSocket(cs, request, response); // Causes the timeout
        if (_callback != nullptr)
        {
            _callback->Connected();
        }
    }
    catch (Exception ex)
    {
      
    }

    if (_ws != nullptr)
    {
        _reactor.reset(new SocketReactor());
        _reactor->setTimeout(Poco::Timespan(_reactorTimeout));
        _reactor->addEventHandler(*_ws, Poco::Observer<MaxWebSocket, Poco::Net::ReadableNotification>(*this, &MaxWebSocket::onReadable));
        _reactor->addEventHandler(*_ws, Poco::Observer<MaxWebSocket, Poco::Net::TimeoutNotification>(*this, &MaxWebSocket::onTimeout));
        _thread.start(*this);
        _time.update();
        return true;
    }
    else
    {
        return false;
    }
   
}

bool ReactorTest::stop()
{
    if (_reactor != nullptr)
    {
        _reactor->removeEventHandler(*_ws, Poco::Observer<MaxWebSocket, Poco::Net::ReadableNotification>(*this, &MaxWebSocket::onReadable));
        _reactor->removeEventHandler(*_ws, Poco::Observer<MaxWebSocket, Poco::Net::TimeoutNotification>(*this, &MaxWebSocket::onTimeout));
        _reactor->stop();
        _thread.join(800);
        _ws->shutdown();
        _reactor = nullptr;
        return true;
    }
    else
    {
        return false;
    }
}

void ReactorTest::run()
{
    _reactor->run();
}
void  ReactorTest::onReadable(Poco::Net::ReadableNotification* pNf)
{
    poco_assert_dbg(pNf->socket() == *_ws);
    pNf->release();
    _reBit = _ws->receiveFrame(_buffer, sizeof(_buffer), _flags);
    if (_reBit < 0)
    {
        return;
    }
    _buffer[_reBit] = '\0';
    std::string resp(_buffer);

    //TODO: Parse json
}
void  ReactorTest::onTimeout(Poco::Net::TimeoutNotification* pTf)
{
    pTf->release();
   //TODO: Timeout handle 。It can also be executed Heartbeat
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/686875.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux安装nodejs

一、下载包 https://registry.npmmirror.com/binary.html?pathnode/ 比如&#xff1a;10.9.0 https://registry.npmmirror.com/binary.html?pathnode/v10.9.0/ 按需下载 https://registry.npmmirror.com/-/binary/node/v10.9.0/node-v10.9.0-linux-x64.tar.gz 二、上传到…

使用Nginx+Lua实现自定义WAF(Web application firewall)

转载https://github.com/unixhot/waf WAF 使用NginxLua实现自定义WAF&#xff08;Web application firewall&#xff09; 功能列表&#xff1a; 支持IP白名单和黑名单功能&#xff0c;直接将黑名单的IP访问拒绝。 支持URL白名单&#xff0c;将不需要过滤的URL进行定义。 支持…

解析vcruntime140.dll文件,缺失了要怎么去修复?

在计算机的世界中&#xff0c;vcruntime140.dll是一个重要的动态链接库文件。然而&#xff0c;有时候这个文件可能会引发一系列问题&#xff0c;影响应用程序的正常运行。如果你缺少了vcruntime140.dll&#xff0c;那么你的程序就会打不开&#xff0c;今天我们一起来聊聊vcrunt…

408数据结构第四章

串 定义存储结构模式匹配 小题形式考&#xff0c;比较简单&#xff0c;拿两个题来练手就会了 定义 字符串简称串 由零个或多个字符组成的有限序列 S是串名n称为串的长度&#xff0c;n0称为空串 串中多个连续的字符组成的子序列称为该串的子串 串的逻辑结构和线性表极为相似&am…

ByteHouse+Apache Airflow:高效简化数据管理流程

Apache Airflow 与 ByteHouse 相结合&#xff0c;为管理和执行数据流程提供了强大而高效的解决方案。本文突出了使用 Apache Airflow 与 ByteHouse 的主要优势和特点&#xff0c;展示如何简化数据工作流程并推动业务成功。 主要优势 可扩展可靠的数据流程&#xff1a;Apache Ai…

使用MASA Stack+.Net 从零开始搭建IoT平台 第五章 使用时序库存储上行数据

目录 前言分析实施步骤时序库的安装解决playload没有时间戳问题代码编写 总结 前言 我们可以将设备上行数据存储到关系型数据库中&#xff0c;我们需要两张带有时间戳的表&#xff08;最新数据表 和 历史数据表&#xff09;&#xff0c;历史数据表存储所有设备上报的数据&…

iptables详解

iptables简介 netfilter/iptables&#xff08;简称为iptables&#xff09;组成Linux平台下的包过滤防火墙&#xff0c;完成封包过滤、封包重定向和网络地址转换&#xff08;NAT&#xff09;等功能。 iptables 规则&#xff08;rules&#xff09;其实就是网络管理员预定义的条…

神通数据库X86架构适配DJANGO317指南

制作神通数据库镜像 1&#xff09;、下载docker.io/centos:7.9.2009镜像&#xff0c;docker pull docker.io/centos:7.9.2009 2)、运行一个容器&#xff0c;docker run -itd --name shentong -p 2003:2003 --privilegedtrue --restartalways -v /sys/fs/cgroup:/sys/fs/cgrou…

万字详解JavaScript手写一个Promise

目录 前言Promise核心原理实现 Promise的使用分析MyPromise的实现在Promise中加入异步操作 实现then方法的多次调用 实现then的链式调用 then方法链式调用识别Promise对象自返回 捕获错误及 then 链式调用其他状态代码补充 捕获执行器错误捕获then中的报错错误与异步状态的链式…

硬盘设备出现“设备硬件出现致命错误,导致请求失败”怎么办?

当我们尝试访问或打开计算机上的硬盘设备&#xff0c;有时候会出现“设备硬件出现致命错误&#xff0c;导致请求失败”的错误提示&#xff0c;这该怎么办呢&#xff1f;下面我们就来了解一下。 出现“设备硬件出现致命错误&#xff0c;导致请求失败”错误的原因有哪些&#xff…

机器学习之SVM支持向量机

目录 经典SVM 软间隔SVM 核SVM SVM分类器应用于人脸识别 SVM优点 SVM缺点 经典SVM 支持向量机&#xff08;Support Vector Machine&#xff0c;SVM&#xff09;是一种二分类模型&#xff0c;其基本思想是在特征空间中找到一个最优的超平面&#xff0c;使得正负样本点到…

数据结构 队列(C语言实现)

绪论 任其事必图其效&#xff1b;欲责其效&#xff0c;必尽其方。——欧阳修&#xff1b;本篇文章主要写的是什么是队列、以及队列是由什么组成的和这些组成接口的代码实现过程。&#xff08;大多细节的实现过程以注释的方式展示请注意查看&#xff09; 话不多说安全带系好&…

Python3,关于请求重试,这次requests库给安排的明明白白。

requests库重试请求 1、引言2、requests库2.1 安装2.2 代码实例2.2.1 重试次数设置2.2.2 重试条件设置2.2.3 超时时间设置 3、总结 1、引言 小屌丝&#xff1a;鱼哥&#xff0c; 你看这是啥&#xff1f; 小鱼&#xff1a;我瞅瞅… 小屌丝&#xff1a;鱼哥&#xff0c;你这眼神…

【计算机视觉】Fast Segment Anything 安装步骤和示例代码解读(含源代码)

文章目录 一、导读二、安装步骤2.1 将存储库克隆到本地2.2 创建 conda 环境2.3 安装软件包2.4 安装 CLIP2.5 下载权重文件2.6 开始使用2.6.1 Everything mode2.6.2 Text prompt2.6.3 Box prompt (xywh)2.6.4 Points prompt 三、示例代码 一、导读 论文地址&#xff1a; https:…

服务器配置与操作

服务器配置与操作 一、连接远程服务器 推荐用xshell 或者 finalshell 或者 winSCP 或者 FileZilla xshell下载地址&#xff1a;https://xshell.en.softonic.com/ 二、服务器配置 2.1 安装JDK 2.1 方法一&#xff1a;在线安装 yum list java* yum -y install java-1.8.0-ope…

【Django | 爬虫 】收集某吧评论集成舆情监控(附源码)

&#x1f935;‍♂️ 个人主页: 计算机魔术师 &#x1f468;‍&#x1f4bb; 作者简介&#xff1a;CSDN内容合伙人&#xff0c;全栈领域优质创作者。 文章目录 一、爬取帖子、二级评论二、构建数据表三、并入项目1. spider代码2. view视图代码3. 优化后台界面3. urls路由 四、定…

第二十三章Java二维数组详解

一、创建二维数组 在 Java 中二维数组被看作数组的数组&#xff0c;即二维数组为一个特殊的一维数组&#xff0c;其每个元素又是一个一维数组。Java 并不直接支持二维数组&#xff0c;但是允许定义数组元素是一维数组的一维数组&#xff0c;以达到同样的效果。声明二维数组的语…

编程规范-控制流程、错误和异常处理

前言&#xff1a; \textcolor{Green}{前言&#xff1a;} 前言&#xff1a; &#x1f49e;这个专栏就专门来记录一下寒假参加的第五期字节跳动训练营 &#x1f49e;从这个专栏里面可以迅速获得Go的知识 今天的笔记是对编程规范的补充&#xff0c;对控制流程、错误和异常处理进行…

Ansys Zemax | 内窥镜物镜系统初始结构的优化提升(下)

系统性能提升 根据上篇的内窥镜系统分析&#xff0c;我们可以从四个方面对内窥镜物镜系统进行优化&#xff1a;元件间距、圆锥系数、MTF 值以及畸变值。点击优化-评价函数编辑器以设置具体的评价函数。&#xff08;联系我们获取文章附件&#xff09; 首先&#xff0c;用三个 CO…

NXP i.MX 8M Plus工业开发板硬件说明书--下册( 四核ARM Cortex-A53 + 单核ARM Cortex-M7,主频1.6GHz)

前 言 本文档主要介绍创龙科技TLIMX8MP-EVM评估板硬件接口资源以及设计注意事项等内容。 创龙科技TLIMX8MP-EVM是一款基于NXP i.MX 8M Plus的四核ARM Cortex-A53 单核ARM Cortex-M7异构多核处理器设计的高性能工业评估板&#xff0c;由核心板和评估底板组成。ARM Cortex-A5…