Reactor 模式全解:实现非阻塞 I/O 多路复用

news2025/1/12 13:38:06

Reactor网络模式是什么?

Reactor网络模式时目前网络最常用的网络模式。如果你使用Netty,那么你在使用Reactor;如果你使用Twisted,那么你子啊使用Reactor;如果你使用netpoll,那么你在使用Reactor。

这里先给出答案:
Reactor = I/O多路复用+非阻塞I/O。

什么是I/O多路复用?

我们还是先使用文字拆解来看看每个词是什么意思吧。

拆词解释

I/O

I/O表示输入和输出,英文为Input/Output。I为输入,O为输出。我们日常编程中操作最多的无非就是网络和文件了,这两类就属于I/O,我们通常称为网络I/O和文件I/O。

下面是两个Java和Go操作文件I/O的例子:
java按行读取文件:

  try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
            String line;
            while ((line = br.readLine()) != null) {
                System.out.println(line);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

Go按行读取文件:

    file, _ := os.Open(fileName)
    defer file.Close()

    reader := bufio.NewReader(file)
    for {
        line,err := reader.ReadString('\n')
        if err != nil {
            break
        }
    fmt.Print(line)

好的,I/O搞清楚了我们就是搞清楚多路。

多路

多路字面意思就是多条路,放在计算机网络编程中的话,一般是指多个通道或者数据源。比如:你的进程或者需要打开很多的文件或者有很多网络连接,并监控这些文件或者网络连接是否发生变化(也就是是否产生一些事件)以进行必要的处理。

复用

复用的字面意思就是重复使用。我们把多路放到计算机网络编程中的话,一般是指重复使用一个或者几个线程,这里的关键是线程一定要很少而且重复使用它来完成I/O+多路。
总的来说就是:重复使用一个或者几个献策会给你来完成多路I/O的变化(事件)处理。

给I/O多路复用下个定义

好了,有了以上的背景或许你已经对I/O多路复用有了自己的理解和定义。这儿我根据自己的理解来对I/O多路复用进行定义:
I/O多路复用就是使用一个或者几个进程或者线程来完成大量通道或者数据源的事件监控和处理。

I/O多路复用复用了什么?

复用了线程。
在没有I/O多路复用之前,可能客户端每创建一个连接服务端都需要新建一个线程来处理事件,这样10K个客户端连接就需要10K个线程,服务端应对这些连接很吃力,因为创建线程有开销,切换线程有开销,还有同步,锁,死锁等问题。

那有了I/O多路复用之后,服务端可能1个线程就可以应对10K个连接的事件。

一般使用什么技术实现I/O多路复用

I/O多路复用技术实现依赖于操作系统,但是主流操作系统都是支持,下面是三大操作系统对I/O多路复用的支持:

  1. Linux: 这个是目前的大哥,Linux使用epoll,当然还有select, poll,目前网络上基本都用epoll
  2. MacOS: Kqueue
  3. Windows: IOCP I/O完成端口

I/O多路复用就告一段落,看下非阻塞I/O。

什么是非阻塞I/O

非阻塞I/O是相对于阻塞I/O而言的,它们之间的区别就是你进行I/O操作时是否阻塞你后续的执行。非阻塞不会阻塞后续执行,而阻塞会。这就好比:
你用某App网上下单到店取一样。假设你直接到店里面用手机下单,你必须在店里等待食物准备好。在这个过程中,你不能去做其他任何事情,直到拿到东西后,你才离开。

而非阻塞I/O就像是你网上下单起手配送,在起手配送期间你可以和你的朋友或者同时唠唠嗑,等骑手把东西送到给你打电话的时候你就下去拿。

总的来说:阻塞I/O中的程序在等待I/O完成时会一直停留在相应操作上,不会执行后续的代码。与之相反,在非阻塞I/O模式下,程序会立即返回一个状态值,如果I/O尚未完成,则程序可以继续执行其他任务,然后随后再次检查I/O状态。
阻塞I/O: 死等
非阻塞I/O:立即返回,下次重试

I/O多路复用和非阻塞I/O组合到一起擦出什么样的火花?

Reactor设计模式结合了非阻塞I/O和I/O多路复用,使得单个线程就能高效地处理多个网络通信。这种结合擦出的“火花”就是使事件驱动的网络服务器变得可能,这种服务器可以以非常轻量级的方式支持大规模并发连接。

在Reactor模式中,一个中央分派器(Reactor)负责监听所有I/O事件(使用select、poll、epoll等系统调用),并且当某个事件发生时,它将调用预先注册的回调函数来处理这些事件。由于采用了非阻塞I/O,这个中央分派器在等待I/O事件时不会被阻塞,这使得它可以在任何给定时间处理上千甚至上万个不同的I/O请求。
下面是单线程的Reactor模型:

单线程Reactor模式

快速实现一个Reactor

  1. 代码关键点1:Reactor线程创建一个事件循环(可能这会勾起你想起Netty的Boss)
 // 创建线程,执行事件循环
    pthread_t accept_threads[2];
    for (int i = 0; i < 1; i++) {
        printf("create acceptor thread. index: %d\n", i);
        // run_event_loop为事件的处理函数,循环处理
        pthread_create(&accept_threads[i], NULL, run_event_loop, &server_fd);
        pthread_detach(accept_threads[i]);
    }

  1. 代码关键点2: 事件处理线程(可能这会勾起你想起Netty的Worker)
// 事件发生后的处理函数: handle_io_event
pthread_create(&worker_thread, NULL, handle_io_event, &client_fd);

  1. 代码关键点3: 非阻塞I/O
// 设置文件描述符为非阻塞I/O
fcntl(client_fd, F_SETFL, fcntl(client_fd, F_GETFL, 0) | O_NONBLOCK);

// 设置非阻塞
fcntl(server_fd, F_SETFL, fcntl(server_fd, F_GETFL, 0) | O_NONBLOCK);

  1. 完整代码
#include <stdio.h>
#include <pthread.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <signal.h>

#define PORT 12345
#define MAX_EVENTS 10
#define BUFF_SIZE 1024
#define WORKER_SIZE 4

// epoll file descriptor
int epoll_fd;

// handlers
void* run_event_loop(void* arg);
void* handle_io_event(void* arg);
void wait_to_death();

int main() {
    int server_fd;
    struct sockaddr_in server_addr;

    // 创建server
    server_fd = socket(AF_INET, SOCK_STREAM, 0);
    // 设置非阻塞
    fcntl(server_fd, F_SETFL, fcntl(server_fd, F_GETFL, 0) | O_NONBLOCK);

    // 绑定
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(PORT);

    printf("binding\n");
    bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr));

    printf("listen\n");
    // 监听
    listen(server_fd, MAX_EVENTS);

    printf("epoll create\n");
    // epoll创建
    epoll_fd = epoll_create1(0);

    struct epoll_event event;
    event.events = EPOLLIN;
    event.data.fd = server_fd;
    printf("epoll add\n");
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event);

    // 创建线程,执行事件循环
    pthread_t accept_threads[2];
    for (int i = 0; i < 1; i++) {
        printf("create acceptor thread. index: %d\n", i);
        pthread_create(&accept_threads[i], NULL, run_event_loop, &server_fd);
        pthread_detach(accept_threads[i]);
    }

    wait_to_death();

    close(epoll_fd);
    close(server_fd);

    return 0;
}

void* run_event_loop(void* arg) {
    int server_fd = *(int*)arg;

    while (1) {
        struct epoll_event events[MAX_EVENTS];
        int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        for (int i = 0; i < n; i++) {
            if (events[i].data.fd == server_fd) {
                // 新连接
                int client_fd = accept(server_fd, NULL, NULL);

                // 设置非阻塞
                fcntl(client_fd, F_SETFL, fcntl(client_fd, F_GETFL, 0) | O_NONBLOCK);

                // worker线程负责处理这个事件
                pthread_t worker_thread;
                pthread_create(&worker_thread, NULL, handle_io_event, &client_fd);
            }
        }
    }
}

void* handle_io_event(void* arg) {
    int client_fd = *(int*)arg;

    while (1) {
        char buff[BUFF_SIZE] = {0};
        int len = read(client_fd, buff, BUFF_SIZE);
        if (len <= 0) {
            close(client_fd);

            struct epoll_event event;
            event.events = EPOLLIN;
            event.data.fd = client_fd;
            epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, &event);

            break;
        }
        else {
            printf("Received %s from client\n", buff);
        }
    }

    return NULL;
}

void wait_to_death() {
    sigset_t allset;
    sigemptyset(&allset);
    sigaddset(&allset, SIGINT); // Ctrl+C
    sigaddset(&allset, SIGQUIT); // Ctrl+\

    int sig;
    for (;;) {
        int err = sigwait(&allset, &sig);
        if (err == 0) {
            printf("received signal %d, prepare to exit\n", sig);
            break;
        }
    }
}

搞定收工,如有错误请指正,谢谢

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1540079.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习——神经网络基础

一、神经网络基本概念 神经网络可以分为生物神经网络和人工神经网络 (1)生物神经网络,指的是生物脑内的神经元、突触等构成的神经网络&#xff0c;可以使生物体产生意识&#xff0c;并协助生物体思考、行动和管理各机体活动。 (2)人工神经网络,是目前热门的深度学习的研究…

hadoop基本概念

一、概念 Hadoop 是一个开源的分布式计算和存储框架。 Hadoop 使用 Java 开发&#xff0c;所以可以在多种不同硬件平台的计算机上部署和使用。其核心部件包括分布式文件系统 (Hadoop DFS&#xff0c;HDFS) 和 MapReduce。 二、HDFS 命名节点 (NameNode) 命名节点 (NameNod…

基于python+vue畅游游戏销售平台flask-django-php-nodejs

本文拟采用Eclipse开发工具&#xff0c;python语言&#xff0c;django/flask框架进行开发&#xff0c;后台使用MySQL数据库进行信息管理&#xff0c;设计开发的畅游游戏销售平台。通过调研和分析&#xff0c;系统拥有管理员和用户两个角色&#xff0c;主要具备注册登录、个人信…

CI/CI实战-jenkis结合gitlab 4

实时触发 安装gitlab插件 配置项目触发器 生成令牌并保存 配置gitlab 测试推送 gitlab的实时触发 添加jenkins节点 在jenkins节点上安装docker-ce 新建节点server3 安装git和jdx 在jenkins配置管理中添加节点并配置从节点 关闭master节点的构建任务数

数据结构/C++:哈希表

数据结构/C&#xff1a;哈希表 哈希表概念哈希函数直接定址法除留余数法 哈希冲突闭散列 - 开放定址法基本结构查找插入删除总代码展示 开散列 - 哈希桶基本结构查找插入删除代码展示 哈希表概念 在顺序表中&#xff0c;查找一个数据的时间复杂度为O(N)&#xff1b;在平衡树这…

城管智慧执法系统源码,基于微服务+java+springboot+vue开发

城管智慧执法系统源码&#xff0c;基于微服务javaspringbootvue开发 城管智慧执法系统源码有演示&#xff0c;自主研发&#xff0c;功能完善&#xff0c;正版授权&#xff0c;可商用上项目。 一套数字化的城管综合执法办案系统源码&#xff0c;提供了案件在线办理、当事人信用…

Reactor Netty

在springframework 里面&#xff0c;我们只有connection id。但是在底层的reactor netty,我们除了connection id还有local address and remote address HTTP/1 HTTP/2

IT运维服务规范标准与实施细则

一、 总则 本部分规定了 IT 运维服务支撑系统的应用需求&#xff0c;包括 IT 运维服务模型与模式、 IT 运维服务管理体系、以及 IT 运维服务和管理能力评估与提升途径。 二、 参考标准 下列文件中的条款通过本部分的引用而成为本部分的条款。凡是注日期的引用文件&#xff0c…

PSO-CNN-SVM,基于PSO粒子群优化算法优化卷积神经网络CNN结合支持向量机SVM数据分类(多特征输入多分类)-附代码

PSO-CNN-SVM&#xff0c;基于PSO粒子群优化算法优化卷积神经网络CNN结合支持向量机SVM数据分类 下面是一个大致的步骤&#xff1a; 数据准备&#xff1a; 准备训练集和测试集数据。对数据进行预处理&#xff0c;包括归一化、标准化等。 设计CNN模型&#xff1a; 设计合适的CNN…

微信小程序----猜数字游戏.

目标&#xff1a;简单猜字游戏&#xff0c;系统随机生成一个数&#xff0c;玩家可以猜8次&#xff0c;8次未猜对&#xff0c;游戏结束&#xff1b;未到8次猜对&#xff0c;游戏结束。 思路和要求&#xff1a; 创建四个页面&#xff0c;“首页”&#xff0c;“开始游戏”&#…

AJAX介绍使用案例

文章目录 一、AJAX概念二、AJAX快速入门1、编写AjaxServlet&#xff0c;并使用response输出字符&#xff08;后台代码&#xff09;2、创建XMLHttpRequest对象&#xff1a;用于和服务器交换数据 & 3、向服务器发送请求 & 4、获取服务器响应数据 三、案例-验证用户是否存…

以太坊基金会JUSTIN DRAKE确认出席Hack.Summit() 2024区块链开发者大会

以太坊基金会JUSTIN DRAKE确认将出席由Hack VC主办&#xff0c;AltLayer、Berachain协办&#xff0c;并获得了Solana、The Graph、Blockchain Academy、ScalingX、0G、SNZ以及数码港的大力支持&#xff0c;本次大会由Techub News承办的Hack.Summit() 2024区块链开发者盛会。 Ju…

在Sequence中缓存Niagara粒子轨道

当Sequence中粒子特效较多时&#xff0c;播放检查起来较为麻烦&#xff0c;而使用Niagara缓存功能可将粒子特效方便的缓存起来&#xff0c;并且还可以更改播放速度与正反播放方向&#xff0c;便于修改。 1.使用Niagara缓存需要先在插件里打开NiagaraSimCaching 2.创建一个常…

Linux的学习之路:2、基础指令(1)

一、ls指令 上篇文章已经说了一点点的ls指令&#xff0c;不过那还是不够的&#xff0c;这篇文章会介绍更多的指令&#xff0c;最起码能使用命令行进行一些简单的操作&#xff0c;下面开始介绍了 ls常用选项 -a 列出目录下的所有文件&#xff0c;包括以 . 开头的隐含文件。 -d…

AIGC——ComfyUI SDXL多种风格预设提示词插件安装与使用

概述 SDXL Prompt Styler可以预先给SDXL模型提供了各种预设风格的提示词插件&#xff0c;相当于预先设定好了多种不同风格的词语。使用这个插件&#xff0c;只需从中选取所需的风格&#xff0c;它会自动将选定的风格词汇添加到我们的提示中。 安装 插件地址&#xff1a;http…

鸿蒙一次开发,多端部署(十三)功能开发的一多能力介绍

应用开发至少包含两部分工作&#xff1a; UI页面开发和底层功能开发&#xff08;部分需要联网的应用还会涉及服务端开发&#xff09;。前面章节介绍了如何解决页面适配的问题&#xff0c;本章节主要介绍应用如何解决设备系统能力差异的兼容问题。 系统能力 系统能力&#xff…

基于python+vue电影院订票信息管理系统flask-django-php-nodejs

根据此问题&#xff0c;研发一套电影院订票信息管理系统&#xff0c;既能够大大提高信息的检索、变更与维护的工作效率&#xff0c;也能够方便信息系统的管理运用&#xff0c;从而减少信息管理成本&#xff0c;提高效率。 该电影院订票信息管理系统采用B/S架构、前后端分离以及…

GuLi商城-商品服务-API-三级分类-网关统一配置跨域

参考文档&#xff1a; https://tangzhi.blog.csdn.net/article/details/126754515 https://github.com/OYCodeSite/gulimall-learning/blob/master/docs/%E8%B0%B7%E7%B2%92%E5%95%86%E5%9F%8E%E2%80%94%E5%88%86%E5%B8%83%E5%BC%8F%E5%9F%BA%E7%A1%80.md 谷粒商城-day04-完…

是德科技N9020A信号分析仪

181/2461/8938产品概述&#xff1a; N9020A MXA信号分析仪通过增加针对新一代技术的信号分析和频谱分析能力&#xff0c;具备了中档分析仪的更高性能。它突破了以往分析仪的极限&#xff0c;支持业界更快的信号和频谱分析,实现了速度与性能的更佳优化。 速度 测试速度超过其它…

电子电器架构 —— 诊断数据DTC起始篇(下)

电子电器架构 —— 诊断数据DTC起始篇(下) 我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师(Wechat:gongkenan2013)。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 本就是小人物,输了就是输了,不要在意别人怎么看自己。江湖一碗茶,喝完再…