Netty Reactor 模式解析

news2025/1/23 13:02:42

目录

Reactor 模式        

具体流程

配置 

初始化

NioEventLoop 

ServerBootstrapAcceptor 分发


Reactor 模式        

在刚学 Netty 的时候,我们肯定都很熟悉下面这张图,它就是单Reactor多线程模型。

在写Netty 服务端代码的时候,下面的代码时必不可少的,这是为什么呢?

public static void main(String[] args) {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup(4);
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workerGroup)
              .channel(NioServerSocketChannel.class)
              .childHandler(new NettyServerHandler(), new NettyServerHandler2());
    System.out.println("netty server start...");
    bootstrap.bind(9000);
}

        在 Netty 里,EventLoopGroup 就是线程池,不论 bossGroup 还是 workerGroup,它们里面的线程都叫 EventLoop。

        EventLoopGroup 就是一个线程池,bossGroup 叫连接线程池,它一般只有一个线程,workerGroup 叫工作线程池,它一般会有多个线程。bossGroup 线程池里的线程专门监听客户端连接事件,监听是否有 SelectionKey.OP_ACCEPT 事件被触发,所以 1 个线程就够用了,当它监听到有客户端请求连接时,它会把这个连接交给 workerGroup 里的一个线程去处理,这个过程叫分发,这个工作线程会为这个客户端建立一个 NIOSocketChannel,并注册到这个工作线程绑定的IO多路复用选择器 Selector 里,一个Selector可以接受多个 NIOSocketChannel 的注册,所以一个工作线程可以处理多个客户端。   这就是Reactor 模式,一个工作线程可以处理多个客户端,比 Java 传统的一个客户端对应一个工作线程节约了很多线程,减少了大量线程创建,线程切换,线程销毁的开销,所以Netty 性能很好。

        上面短短的服务端代码做了很多工作,当它刚启动还没有客户端请求连接时,bossGroup 连接线程池里的一个线程 EventLoop 会初始化一个 NioServerSocketChannel ,并把这个Channel注册到这个EventLoop 持有的IO多路复用选择器Selector里,Selector 会监听Channel里的 SelectionKey.OP_ACCEPT 事件,一旦有客户端连接过来,它会通过下面代码获取到一个

SocketChannel ch = javaChannel().accept();

NioSocketChannel,并把这个 NioSocketChannel 注册到 workerGroup 工作线程池里的一个EventLoop 里,它使用了一个叫 ServerBootstrapAcceptor 的 ChannelInboundHandler接口类去完成这个过程,连接完成后,后续这个客户端和服务端的交互和数据读写都在这个 EventLoop 完成。

具体流程

        下面我们看一下代码,Netty 代码中使用了很多继承,在继承中可以把子类相同的部分代码提到父类去完成,很多子类生成初始化的时候,它会调用父类的构造方法去完成,这个要注意。

配置 

        下面的代码主要做一些启动器的配置,group(bossGroup, workerGroup) 会设置连接线程池和工作线程池,后面有连接事件或读事件过来要处理时,它会从这些线程池里取线程去执行;channel(NioServerSocketChannel.class) 指定要生成服务端Channel,它只会监听SelectionKey.OP_ACCEPT 事件,childHandler(new NettyServerHandler(), new NettyServerHandler2()) 是我们业务处理的逻辑。

bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new NettyServerHandler(), new NettyServerHandler2());

初始化

        bind() 会把 ServerBootstrapAcceptor 添加到 NioServerSocketChannel 的 pipeline ,它会处理连接;获取一个 bossGroup 线程池里的 EventLoop并和 NioServerSocketChannel  进行绑定。

bootstrap.bind(9000)
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> {
    public void bind(int inetPort) {
        doBind(new InetSocketAddress(inetPort));
    } 
    // bind 流程
    private void doBind(final SocketAddress localAddress) {
        initAndRegister();
        // 让channel绑定的线程处理
        channel.eventLoop().execute(()->{
            // 绑定指定端口
            channel.bind(localAddress);
        });
    } 
    // 初始化和注册
    final void initAndRegister() {
        init(channel);
        // 把 NioServerSocketChannel 注册到一个复杂连接事件的 EventLoop 的 Selector 里 
        group.register(channel);
    } 
    // 把 ServerBootstrapAcceptor 添加到 NioServerSocketChannel 的 pipeline 里
    abstract void init(Channel channel);
}

NioEventLoop 

        现在要说一下 NioEventLoop,它拥有一个IO多路复用选择器 Selector,这个线程会在一个死循环里工作,永远也会停止;这个线程它会先执行一下 selector.select(1000),阻塞监听1秒,看看有没有Channel有事件过来,有就去处理任务,没有就等待1秒钟再超时放弃,再看看自己的任务队列有没有可执行的任务,有就去处理任务,没有就继续进行死循环,继续执行 selector.select(1000)。无论是连接线程还是工作线程都这样处理,因为它们共用了这套逻辑。

public class NioEventLoop extends SingleThreadEventLoop {
    @Override
    protected void run() {
        for (;;) {
            try {
                select();
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                # 处理事件
                processSelectedKeys();
            } finally {
                runAllTasks();
            }
        }
    }
    private void select() throws IOException {
        // 拿到多路复用器
        Selector selector = this.selector;
        for (;;) {
            // 等待,简化固定1秒
            int selectedKeys = selector.select(1000);
            // 如果有事件发生或当前有任务跳出循环
            if (selectedKeys != 0 || hasTasks()) {
                break;
            }
        }
    }
}

像下面这种 channel.eventLoop().execute(Runnable), 它也只是把 Runnable 加入到任务处理队列,稍后执行。

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> { 
    private void doBind(final SocketAddress localAddress) {
        ...
        channel.eventLoop().execute(()->{ 
            channel.bind(localAddress);
        });
    }  
}
public abstract class SingleThreadEventExecutor implements Executor {    
    // 待执行任务队列
    private final Queue<Runnable> taskQueue;   
    @Override
    public void execute(Runnable task) {
        // 把任务添加到 EventLoop 的任务队列,EventLoop 是 SingleThreadEventExecutor 的子类
        addTask(task);
        // 执行 EventLoop 的 run 逻辑
        startThread();
    }
}

        当 NioServerSocketChannel.accept() 监听到一个客户端连接,它会把这个 NIOSocketChannel 通过 pipeline 处理,最终被 ServerBootstrapAcceptor 所处理,

public class NioServerSocketChannel extends AbstractNioMessageChannel {
    @Override
    protected int doReadMessages(List<Object> buf) {
        SocketChannel ch = null;
        try {
            ch = javaChannel().accept();
        } catch (IOException e) {
        }
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
        return 0;
    }
}
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
    @Override
    public void read() {
        final ChannelPipeline pipeline = pipeline();
        doReadMessages(readBuf);
        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
    }

    protected abstract int doReadMessages(List<Object> buf);
}

ServerBootstrapAcceptor 分发

        ServerBootstrapAcceptor 管理 workerGroup 里的所有工作线程和所有的业务处理代码 ChannelHandler,ServerBootstrapAcceptor 会把所有的 ChannelHandler 放到刚刚监听得到的 NIOSocketChannel 里的 pipeline 里,并从 workerGroup 里选择一个 EventLoop 工作线程把NIOSocketChannel 注册到该 EventLoop 拥有的IO多路复用选择器 Selector 里去,这就完成了分发,它已经处理了连接,后续这个 NIOSocketChannel 里的所有读写事件都会被 Selector 监听到,并被该 EventLoop 工作线程所处理。

private static class ServerBootstrapAcceptor implements ChannelInboundHandler {
        // 工作线程池,即 workerGroup 
        private final EventLoopGroup childGroup;
        // 业务操作 Handler
        private final ChannelHandler[] childHandlers;

        private ServerBootstrapAcceptor(EventLoopGroup childGroup, ChannelHandler[] childHandlers) {
            this.childGroup = childGroup;
            this.childHandlers = childHandlers;
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            final Channel child = (Channel) msg;
            // 完成 pipeline 责任链模式的组装
            for (ChannelHandler childHandler : childHandlers) {
                child.pipeline().addLast(childHandler);
            }
            // 把Channel 注册到 Selector
            childGroup.register(child);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            // 略
        }
    }
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1409078.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue 解决:Module not found: Error: Can‘t resolve ‘vue-router‘ 的问题

1、问题描述&#xff1a; 其一、报错为&#xff1a; Module not found: Error: Cant resolve vue-router 中文为&#xff1a; 找不到模块&#xff1a;错误&#xff1a;无法解析“vue-router” 其二、问题描述为&#xff1a; 根据报错的中文信息可知&#xff1a;应该是无法…

PWN入门Protostar靶场Stack系列

Protostar靶场地址 https://exploit.education/protostar/溢出 源码分析 #include <stdlib.h> #include <unistd.h> #include <stdio.h>int main(int argc, char **argv) {volatile int modified; //定义一个变量char buffer[64]; //给…

Git服务器、GitLab介绍及搭建、HIS代码托管、CI/CD概述、Jenkins部署、Jenkins插件、Jenkins工程构建

案例1&#xff1a;GitLab服务器搭建 使用rpm包本地部署GitLab服务器 #确认GitLab主机硬件配置[rootGitLab ~]# free -mtotal used free shared buff/cache availableMem: 3896 113 3691 8 90 3615…

day31WEB攻防-通用漏洞文件上传js验证mimeuser.ini语言特性

目录 1.JS验证 2.JS验证MIME 3.JS验证.user.ini 4.JS验证.user.ini短标签 &#xff08;ctfshow154&#xff0c;155关&#xff09; 5.JS验证.user.ini短标签过滤 [ ] 6.JS验证.user.ini短标签加过滤文件头 有关文件上传的知识 1.为什么文件上传存在漏洞 上传文件…

视频汇聚/云存储平台EasyCVR级联上级播放后一直发流是什么原因?

可视化云监控平台/安防视频监控系统EasyCVR视频综合管理平台&#xff0c;采用了开放式的网络结构&#xff0c;可以提供实时远程视频监控、视频录像、录像回放与存储、告警、语音对讲、云台控制、平台级联、磁盘阵列存储、视频集中存储、云存储等丰富的视频能力&#xff0c;同时…

大创项目推荐 题目: 基于深度学习的疲劳驾驶检测 深度学习

文章目录 0 前言1 课题背景2 实现目标3 当前市面上疲劳驾驶检测的方法4 相关数据集5 基于头部姿态的驾驶疲劳检测5.1 如何确定疲劳状态5.2 算法步骤5.3 打瞌睡判断 6 基于CNN与SVM的疲劳检测方法6.1 网络结构6.2 疲劳图像分类训练6.3 训练结果 7 最后 0 前言 &#x1f525; 优…

spire.doc合并word文档

文章目录 spire.doc合并word文档1. 引入maven依赖2. 需要合并的word3. 合并文档代码4. 合并结果 spire.doc合并word文档 1. 引入maven依赖 <repositories><repository><id>com.e-iceblue</id><name>e-iceblue</name><url>https://r…

JVM篇----第五篇

系列文章目录 文章目录 系列文章目录前言一、Java 中堆和栈有什么区别?二、描述一下 JVM 加载 class 文件的原理机制三、GC 是什么?为什么要有 GC?前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通…

LIMS源码,实验室信息系统源码,后端框架:asp.net

LIMS(laboratory information management system)即实验室信息管理系统是实验室管理科学发展的成果&#xff0c;是实验室管理科学与现代信息技术结合的产物&#xff0c;是利用计算机网络技术、数据存储技术、快速数据处理技术等&#xff0c;对实验室进行全方位管理的计算机软件…

利用tpu-mlir工具将深度学习算法模型转成算能科技平台.bmodel模型的方法步骤

目录 1 TPU-MLIR简介 2 开发环境搭建 2.1 下载镜像 2.2 下载SDK 2.3 创建容器 2.4 加载tpu-mlir 3 准备工作目录 4 onnx转mlir文件 5 mlir转INT8 模型 5.1 生成校准表 5.2 便以为INT8对称量化模型 参考文献&#xff1a; 之前是用nntc转算能科技的模型的&#xff0c…

ExperimentalWarning: The http2 module is an experimental API.

错误提示 Node.js:ExperimentalWarning: The fs.promises API is experimental原因是node的版本不是最新的&#xff0c;而在项目引入的模块是最新的&#xff0c;node.js的版本低于模块的版本&#xff1a; 解决方法: 1、升级版本 npm install -g npm 更新npm到最新版 npm ins…

【C++干货铺】 RAII实现智能指针

个人主页点击直达&#xff1a;小白不是程序媛 C系列专栏&#xff1a;C干货铺 代码仓库&#xff1a;Gitee 目录 为什么需要智能指针&#xff1f; 内存泄漏 什么是内存泄漏&#xff0c;内存泄露的危害 内存泄漏的分类 堆内存泄漏&#xff08;Heap leak&#xff09; 系统资…

内衣洗衣机有必要买吗?五款好用的迷你洗衣机推荐

冬天正在临近&#xff0c;普通的衣服有日常的洗衣机洗&#xff0c;但内衣裤就成了很多小伙伴的困扰&#xff0c;在我们的观念中&#xff0c;内衣裤是绝对不可以和普通的衣服一起清洗&#xff0c;在冰冷的冬季还要手洗这些贴身衣物&#xff0c;真的很难受&#xff0c;所以拥有一…

fpga外置flash程序烧录流程

Fpga外置FLASH程序烧录流程&#xff1a; step1&#xff1a; 打开vivado2019.2软件&#xff0c;找到hardware manager选项&#xff0c;进入该功能界面&#xff1b; Step2&#xff1a; 确定连接状态&#xff0c;当JTAG正确连接到板卡的调试插针后&#xff0c;会在状态窗口显示…

RNN与NLP

目录 数据处理基础&#xff1a; 处理文本信息&#xff08;text -> sequence&#xff09;&#xff1a; simple RNN模型&#xff1a; 这个教程的笔记&#xff1a; RNN模型与NLP应用(1/9)&#xff1a;数据处理基础_哔哩哔哩_bilibili 数据处理基础&#xff1a; 不能用标量…

eBay在人工智能道路上的成败得失:衡量标准是关键

我是2006年加入eBay的。2009年&#xff0c;这家公司的运营状况非常糟糕&#xff0c;其股价创历史新低&#xff08;远低于近24美元的历史高位&#xff09;&#xff0c;还出现削减各项成本、负增长、市场占有率降低、技术团队缺乏创新能力等情况。 简而言之&#xff0c;eBay公司…

[极客大挑战 2019]LoveSQL1

万能密码测试&#xff0c;发现注入点 注意这里#要使用url编码才能正常注入 测试列数&#xff0c;得三列 查看table&#xff0c;一个是geekuser另一个是l0ve1ysq1 查看column&#xff0c;有id&#xff0c;username&#xff0c;password&#xff0c;全部打印出来&#xff0c;…

未来已来:AI引领智能时代的多领域巨变

大家好&#xff0c;今天我们将深入探讨人工智能如何彻底改变我们的生活方式&#xff0c;领略未来的无限可能性。 1. 医疗革新&#xff1a;AI担任超级医生 医疗领域是AI最引人注目的战场之一。智能医学影像诊断系统&#xff0c;不仅能够精准识别病变&#xff0c;还能辅助医生提…

excel统计分析——Tukey‘s-b法多重比较

参考资料&#xff1a;生物统计学 Tukeys-b多重比较法是对Tukey法和S-N-K法的综合&#xff0c;取两种方法临界值的各1/2合成。临界值表达式为&#xff1a; 其中&#xff0c;m为秩次距&#xff0c;k为样本平均数的个数&#xff0c;df为误差项自由度&#xff0c; Tukey多重比较具…

计算机网络 第4章(网络层)

系列文章目录 计算机网络 第1章&#xff08;概述&#xff09; 计算机网络 第2章&#xff08;物理层&#xff09; 计算机网络 第3章&#xff08;数据链路层&#xff09; 计算机网络 第4章&#xff08;网络层&#xff09; 文章目录 系列文章目录1. 概述1.1 简介1.2 总结 2. 网络…