Netty入门指南之Reactor模型

news2025/1/23 8:06:55

作者简介:☕️大家好,我是Aomsir,一个爱折腾的开发者!
个人主页:Aomsir_Spring5应用专栏,Netty应用专栏,RPC应用专栏-CSDN博客
当前专栏:Netty应用专栏_Aomsir的博客-CSDN博客

文章目录

  • 参考文献
  • 前言
  • 单线程Reactor模型
  • 主从式Reactor模型
    • 多线程知识扫盲
    • Worker线程
    • Boss线程
    • 客户端
  • 总结

参考文献

  • 孙哥suns说Netty
  • Netty官方文档

前言

在我们之前的文章中,我们详细地探讨了Java NIO和Selector的相关内容,这为我们进一步的学习打下了坚实的基础。从本篇文章开始,我们将深入学习并理解Reactor模型

单线程Reactor模型

在前两篇文章,我们使用Selector去监控Channel的ACCEPT事件、WRITE事件、READ事件等等,监听到以后就在当前线程进行处理,这已经是一个单线程的Reactor的模型,Selector来进行分发,起到一个多路复用器的作用,但是这个还远远不够,怎么能只让一个线程来同时处理ACCEPT、WRITE和READ,所以就有了我们后面的主从式
在这里插入图片描述

主从式Reactor模型

谈起主从架构,也就是master-slave,它是主节点做一部分内容,从节点做另外一部分的内容,主从都干活,但是干的活内容不一样,比如我们常见的MySQLRedis,它们的读写分离就是主从式架构。
还有一种架构是主备架构(Master-Backup),这种架构就是主挂了以后,从起作用,干的活是一样的,比如Redis中的哨兵机制

结合如下图例更为详细的理解主从式Reactor模型,我们的Boss和Worker都是不同的线程,甚至在实战过程中会是不同的服务器。Boss线程主要用于接收Accept请求,去与客户端建立SocketChannel连接,Worker线程主要去处理实际的读写操作。我们需要把单线程Reactor模型中的sc.register(selector, SelectionKey.OP_READ)转移到Worker线程中去。
在这里插入图片描述

多线程知识扫盲

在接下来的学习中,我们将使用NIO和Selector来实现一个主从Reactor模型。这需要我们具备一定的多线程知识,因此这里我会为你简单介绍一下Java中的多线程。

在Java中,我们通常通过Thread类来创建和管理新的线程。在实际开发中,我们可以创建一个新的类,让它继承Thread类,并重写其run方法。在这个run方法中,我们可以编写自己的多线程任务逻辑。但是,Java的类只能单继承,这在某些情况下可能会对我们的系统设计造成限制。

因此,Java还为我们提供了另一种创建线程的方式,即通过实现Runnable接口。我们可以自定义一个类,让它实现Runnable接口,并重写其run方法,在这个方法中编写我们的多线程任务逻辑。然后,我们可以将这个Runnable实现类的对象传递给Thread类的构造方法,从而创建Thread类的对象。这种方式的优点是,我们不再需要直接继承Thread类来实现多线程任务,而是可以将任务逻辑封装在实现了Runnable接口的类中。这样,我们的类就可以在保持多线程功能的同时,也能继承其他类,从而提供更大的设计灵活性。

由于Java的Thread类实现了Runnable接口,我们可以在设计系统时,采用以下策略:在Runnable的实现类中,添加一个Thread类型的属性,并提供一个register方法。在这个register方法中,我们可以初始化Thread属性,直接将当前类对象(Runnable实现类)传入Thread构造方法进行初始化,然后启动线程。这样我们就可以直接在Runnable内部直接进行线程任务逻辑等,而外部只需要提供一个Runnable接口实现类,线程的创建和启动等都在Runnable接口内部进行操作,封装度更高也更灵活

⚠️注意

  • 启动多线程任务是通过Thread#start()方法,而不是通过Thread#run()方法,调用start方法以后,CPU的时间片也不会立马分配给这个线程
  • 除了Thread#run()和Runnable#run()方法内的代码是属于多线程的,其余的都是main线程,包括Runnable实现类中的自定义方法
  • CPU时间片不一定会等待主线程某个方法完全执行才切换给别的线程,但它一定会等一个代码块执行完,比如if
public class MyThread extends Thread{
    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            System.out.println("线程任务逻辑" + i);
        }
    }
}
public class MyRunnable implements Runnable{
    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            System.out.println("线程任务逻辑" + " " + i);
        }
    }
}

public class RunnableTest {
    public static void main(String[] args) {
        // 创建任务对象
        MyRunnable myRunnable = new MyRunnable();

        // 创建线程对象,并将任务传递进去
        Thread t1 = new Thread(myRunnable);

        // 启动线程
        t1.start();

        for (int i = 0; i < 100; i++) {
            System.out.println("main线程" + " " + i);
        }
    }
}

Worker线程

这是我们的Worker线程,用于处理客户端与服务端的读写,在我们这个案例中,所有的读写都交给这些worker线程,主线程不管具体的写。如果是单核CPU,那么时间片会不停的在这些时间片时间轮转,而如果是多核CPU,那直接主线程用于处理连接,多个worker线程用于处理具体的读写

在下面这个Worker模型中,我们Worker是一个Runnable实现类,其中包含一个Thread类型的属性,在register方法中对它进行初始化,将实现类本类对象传入,代表后面这个thread对象调用的任务是实现类中重写的run方法的逻辑。

⚠️注意

  • 主线程和Worker线程维护不同的selector,以免出现污染,主线程的selector监控ServerSocketChannel的ACCEPT事件,Worker线程的selector监控注册在对应线程上的SocketChannel的READ/WRITE事件
  • register方法属于主线程,如果等初始化完还没将SocketChannel注册到这个线程的Selector上,就去执行Worker线程的run方法,那selector就会成为阻塞状态,当CPU时间片切换回主线程,就会注册不上,成为一个死锁状态。
  • 为了解决上面这个问题,我们需要将注册这部分的代码放在任务队列里进行传递,但是阻塞问题还是存在,所以我们将selector唤醒,不让其阻塞。当时间片切换到Worker线程,select方法就不会阻塞,运行循环下面的代码,将注册的代码取出来运行,然后处理读写

☹️难点

  • 时间片在sc未注册到selector时就切换给worker线程导致selector阻塞,然后导致阻塞
  • 使用任务队列传递代码同时需要唤醒selector
  • 思路的转变
public class Worker implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(Worker.class);

    // 一个线程对应一个selector,以免污染
    private Selector selector;

    // 线程Thread对象
    private Thread thread;

    // 线程名
    private String name;

    // 通过volatile进行线程同步
    private volatile boolean isCreated;

    // 任务队列
    private ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue();

    // 构造器
    public Worker(String name) {
        this.name = name;
    }

    // 线程任务(此段还属于主线程!!!)
    public void register(SocketChannel sc) throws IOException,InterruptedException {
        log.debug("worker register invoke...");

        // 对于一个Runnable对象,被调用register后
        // isCreated标志位就会被置为true
        // 注意:CPU等这个if代码块执行结束才有可能被调度到worker线程
        if (!isCreated) {
            thread = new Thread(this, name);

            // 调了start,不会立马分配资源(除非抢夺)
            thread.start();
            selector = Selector.open();
            isCreated = true;
        }

        // 模拟此处时间片分给worker线程
        // worker线程进入run方法,开始阻塞监听
        // selector就会一直阻塞在select方法上,时间片切换回主线程也无法注册
        // Thread.sleep(1000);

        // 任务队列:将main线程中注册的代码传递给worker线程
        runnables.add(() -> {
            try {
                sc.register(selector, SelectionKey.OP_READ);
            } catch (ClosedChannelException e) {
                throw new RuntimeException(e);
            }
        });

        // 唤醒阻塞在select方法上的worker线程
        // 这样时间片切换到worker线程就直接跳过select方法
        selector.wakeup();
    }

    /**
     * 线程任务:实际处理读写操作
     */
    @Override
    public void run() {
        while (true) {
            log.debug("worker run method invoke...");
            try {
                // 阻塞监听SocketChannel的OP_READ
                selector.select();

                // 从任务队列中取出任务执行
                Runnable poll = runnables.poll();
                if (poll != null) {
                    // 执行注册的步骤
                    poll.run();
                }

                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey scKey = iterator.next();
                    iterator.remove();

                    if (scKey.isReadable()) {
                        SocketChannel sc = (SocketChannel) scKey.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(30);

                        int read = sc.read(buffer);
                        if (read == -1) {
                            scKey.cancel();
                            break;
                        }

                        buffer.flip();
                        String result = Charset.defaultCharset().decode(buffer).toString();
                        System.out.println("result = " + result);
                    }
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Boss线程

Boss线程用来监听ServerSocketChannel的ACCEPT事件,监听到了以后将其传递给Worker线程去注册和监听处理,注意线程池只有两个Worker线程,为了保证每一个新进来的SocketChannel都被注册到与前一个线程不同的线程上,这里使用AtomicInteger原子操作类来处理

public class ReactorBossServer {

    private static final Logger log = LoggerFactory.getLogger(ReactorBossServer.class);

    public static void main(String[] args) throws Exception{
        log.debug("boss thread start...");

        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8000));

        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        // 模拟任务池,将任务对象进行创建
        Worker[] workers = new Worker[2];
        for (int i = 0; i < workers.length; i++) {
            // Worker worker = new Worker("worker1");
            workers[i] = new Worker("worker - " + i);
        }

        // 原子操作类
        AtomicInteger index = new AtomicInteger();

        while (true) {
            // 阻塞等待Channel的事件的触发
            selector.select();

            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey sscSelectionKey = iterator.next();
                iterator.remove();

                // 如果是ACCEPT请求则进行处理,交给worker线程处理
                if (sscSelectionKey.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);

                    log.debug("boss invoke worker register...");

                    // hash取模  x%2 = 0|1
                    // 通过原子类确保sc每次进来注册给不同的worker
                    workers[index.getAndIncrement() % workers.length].register(sc);

                    log.debug("boss invoke worker register...");
                }
            }
        }
    }
}

客户端

public class MyClient {

    private static final Logger log = LoggerFactory.getLogger(MyClient.class);

    public static void main(String[] args) throws Exception{
        // 1、创建客户端channel,并连接服务端
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress(8000));
        socketChannel.write(Charset.defaultCharset().encode("hello\n"));
        System.out.println("-------------------------------------");
    }
}

总结

这篇文章的阅读绝对值得你的精心研读。Netty的基础建立在NIO之上,如果单纯的学习Netty,你可能只会看到一堆的API,而无法深入理解其背后的设计原则和工作机制。

然而,本篇文章从设计理念到编码实践,详细剖析了Reactor模型,为Netty的学习铺平了道路。这不仅帮助你理解Netty的运作方式,更能让你洞察其背后的设计哲学,使你在学习时不仅知其然,更能知其所以然。因此,这篇文章对于深化你对Netty的理解,研究其内部工作原理,无疑具有极大的价值

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1205021.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网银转账虚拟生成器在线制作,工商农业邮政建设招商,标签+对话框+画板+快照实现

标签对话框画板快照实现就实现了一个虚拟截图生成器&#xff0c;当然我加了水印了&#xff0c;这个图片你根本盗用不了&#xff0c;图片模版的话网上真的太多了&#xff0c;我这个也是网上找的&#xff0c;自己百度图库搜一下&#xff0c;然后标签记得一定用黑月的透明标签&…

c语言-数据结构-链表分割

链表分割实际上是给定一个值&#xff0c;遍历链表把链表中小于该值的节点与大于该值的节点分开&#xff0c;一般是将小于该值的节点放到链表的前面部分&#xff0c;大于该值的节点放在链表的后面部分。 链表分割示意图如下&#xff1a; 思路&#xff1a; 首先创建两条带哨兵位节…

Topk问题!(面试高频常考)

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; 剑指offer &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f4d1;前言&#x1f324;️什么是Top-k问题&#xff1f;&#x1f324;️常见的Top-K问题类型☁️寻找…

Global_Mapper_Pro_25.0安装教程大全

一. 下载&#xff1a; http://dt1.8tupian.net/2/29913a55b1000.pg3二. 介绍&#xff1a; Global Mapper Pro 25是领先的GIS数据处理解决方案&#xff01;提供了一整套符合标准的功能来提升您的操作和技能&#xff0c;您可以最合理的利用您的工具集来完成以前复杂的工作任务&a…

深度学习AIR-PolSAR-Seg图像数据预处理

文章目录 深度学习sar图像数据预处理一.图片预处理操作1.log(1x)处理2.sqrt平方化处理 二.原网络训练效果展示原始数据训练效果展示&#xff1a; 三.对比实验1.采用原始数据2.采用取log(1x)后的数据3.采用取平方后归一化处理&#xff1a; 四.总结&#xff1a;五.思考 深度学习s…

揭秘 DCNN——AlexNet

来源 — gifs.com 一、说明 还记得 2012 年的 ImageNet 视觉识别挑战赛吗&#xff1f;当然&#xff0c;你知道&#xff01;经过大量的反复试验和实验&#xff0c;研究员 Alex Krizhevsky 及其合著者 Ilya Sutskever 和 Geoffrey E. Hinton&#xff08;他真正理解了深度学习中…

Windows没有USB启动选项很常见,但解决方法更常见

当试图在计算机上重新安装Windows 11/10操作系统,或从安装介质启动时,一些用户看到错误–系统没有任何USB启动选项,请在启动管理器菜单中选择其他启动选项。此错误出现在不同OEM的多个设备,原因包括启用了安全引导、禁用了Legacy/CSM支持、联想服务引擎、未正确制作可引导U…

万宾科技智能传感器EN100-C2有什么作用?

在日常生活中井盖是一种常见的城市设施&#xff0c;但井盖出现问题可能会对人们的生活造成什么影响呢&#xff1f;移位或老化的井盖可能会威胁人们的安全&#xff0c;同时也会影响城市生命线的正常运行。然而智能井盖的出现为解决这些问题提供了有效的应对方案。 WITBEE万宾智能…

Day44 力扣动态规划 : 300.最长递增子序列|674. 最长连续递增序列 | 718. 最长重复子数组

Day44 力扣动态规划 : 300.最长递增子序列&#xff5c;674. 最长连续递增序列 &#xff5c; 718. 最长重复子数组 300.最长递增子序列第一印象看完题解的思路dp递推公式遍历顺序初始化 实现中的困难感悟代码 674. 最长连续递增序列第一印象dp状态转移公式遍历顺序初始化 看完题…

能够定时发送朋友圈的软件

此款软件提供便捷的网页端登录方式&#xff0c;让您轻松管理多个账号&#xff0c;实现多账号聚合管理&#xff0c;只需一个界面即可解决所有问题。 朋友圈内容编辑功能强大&#xff0c;让您在输入框内输入文本内容&#xff0c;点击表情图标选择表情&#xff0c;还能通过“”图标…

Go利用反射实现一个ini文件的解析器程序

package mainimport ("bufio" // 逐行读取配置文件"fmt""log""os""reflect""strconv""strings" )type Config struct { // 定义配置结构体Section1 Section1 ini:"section1" // 嵌套结构体1…

洗地机哪个牌子最好用?洗地机怎么选?2023洗地机选购推荐

家里有小孩或者是养有宠物的都有一个深刻的体验&#xff0c;那就是房子每天都很乱&#xff0c;隔三岔五就得做一次卫生清理、地板杀菌等。如果是房屋面积太大的话&#xff0c;只靠自己手动清洁是非常的耗时间并且还很劳累。洗地机的出现可谓是造福人类&#xff0c;解脱了家庭劳…

EM@一次双绝对值不等式

文章目录 一次双绝对值不等式求解步骤去绝对值情况分析&#x1f47a;例例代数法几何方法比较 例 一次双绝对值不等式求解步骤 设 f ∣ f 1 ∣ ∣ f 2 ∣ f|f_1||f_2| f∣f1​∣∣f2​∣, f 1 , f 2 f_1,f_2 f1​,f2​都是一次多项式,则原不等式 f ⩾ a f\geqslant{a} f⩾a或 …

跨越编程界限:C++到JavaSE的平滑过渡

JDK安装 安装JDK 配置环境变量&#xff1a; Path 内添加 C:\Program Files\Java\jdk1.8.0_201\bin 添加 JAVA_HOME C:\Program Files\Java\jdk1.8.0_201 添加 CLASSPATH .;%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar 第一个Java程序 HelloWorld.java public class…

记录--让我们来深入了解一下前端“三清”是什么

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 前端“三清” 在前端开发中&#xff0c;我们经常听到关于“三清”的说法&#xff0c;即 window、document、Object。这三者分别代表了 BOM(浏览器对象模型)、DOM(文档对象模型)以及 JS 的顶层对象。在…

Git精讲(一)

&#x1f4d8;北尘_&#xff1a;个人主页 &#x1f30e;个人专栏:《Linux操作系统》《经典算法试题 》《C》 《数据结构与算法》 ☀️走在路上&#xff0c;不忘来时的初心 文章目录 一、Git初识1、提出问题2、如何解决--版本控制器3、注意事项 二、Git 安装1、Linux-centos2、…

世微 升压恒压IC dc-dc转换器 充电器手持设备便携式产品 AP8660

AP8660是一款升压dc-dc转换器&#xff0c;内置MOS调节器&#xff0c;内部补偿&#xff0c;还可以最小6个外部组件&#xff0c;内部的软识启动功能可以降压涌入电流 AP8660 SOT23-6封装&#xff0c;可以为PCB提供节省空间 特点 可调输出&#xff0c;最高达到24W 内部固定PWM频…

语言大模型的预训练过程——从无尽数据到海量知识

从目前研究结果看&#xff0c;Transformer是足够强大的特征抽取器&#xff0c;通过预训练过程&#xff0c;Transformer学到了什么&#xff1f;知识是如何存取的&#xff1f;我们又如何修正错误知识&#xff1f;本文讲述这方面的研究进展。 1、求知&#xff1a;LLM学到了什么知…

【JAVA】去掉 if...else 的七种绝佳之法...

文章目录 前言方法一&#xff1a;提前 return方法二&#xff1a;枚举方案三&#xff1a;Optional 判空方案四&#xff1a;表驱动法方案五&#xff1a;策略模式 工厂方法方案六&#xff1a;责任链模式方案七&#xff1a;Function总结 前言 我相信小伙伴一定看过多篇怎么去掉 i…

解决渗透测试js文件泄露

解决办法&#xff1a;使用过滤器过滤 public class StaticSourceFilter implements Filter {private static Logger logger LoggerFactory.getLogger(StaticSourceFilter.class);Overridepublic void init(FilterConfig filterConfig) throws ServletException {}Overridepub…