Java-NIO篇章(4)——Reactor反应器模式

news2025/1/13 13:27:18

前面已经讲过了Java-NIO中的三大核心组件Selector、Channel、Buffer,现在组件我们回了,但是如何实现一个超级高并发的socket网络通信程序呢?假设,我们只有一台内存为32G的Intel-i710八核的机器,如何实现同时2万个客户端高并发非阻塞通信?可能你会说不可能实现,答案是2万的并发可能都低估了,Redis单机通信20万的并发都是可以的,当然达到20万的并发对机器性能以及带宽都需要非常高的要求。那么就不得不引出今天讲解的Reactor反应器模式,它可以说是一种高并发网络编程中的设计模式,不包括在我们常说的23中设计模式之中。Netty网络框架、Nginx服务器、Reids缓存等大名鼎鼎的中间件都是基于Reactor反应器模式设计的,它就能提供超高并发的网络通信,我学过之后一直感叹这些大佬都是奇才,学这些思想精彩万分!下面具体进行介绍:

Reactor是什么?

Reactor就是一种网络编程的设计模式,如果不知道Reactor那么学Netty的时候会非常困难,因为很多概念就是Reactor,因此学会了Reactor在学Netty就非常简单。其次,懂得高并发中间件的网络通信设计的底层原理对提升自己的技术也是非常重要的,所以,学习像Netty这样的“精品中的精品”框架,肯定也是需要先从设计模式入手的。而Netty的整体架构,就是基于这个著名反应器模式。所以,学习和掌握反应器模式,对于开始学习高并发通信(包括Netty框架)的人来说,一定是磨刀不误砍柴工,况且很多中间件都是基于Netty来设计网络通信模块的。

思维风暴开启Reactor之路

好的,我们用一个例子开始讲解Reactor原理,假设你是Doug Lea,Java JUC包的作者, 也是Reactor设计模式的提出者之一。现在面临的一个问题就是现在的软件系统不能够满足日益增长的并发量,很多软件系统一旦人访问数多了要么卡死要么阻塞一段时间才有响应,用户体验非常差,现在公司提出了这个需求需要你解决。请你思考:

单线程阻塞模式

首先TCP网络通信需要先建立连接(三次握手)然后才可以传输数据,于是你写下了第一行解决的代码:

1 while(true){
2     socket = accept(); //阻塞,接收连接
3     handle(socket) ; //读取数据、业务处理、写入结果
4 }

5 private void handle(socket){
6     String msg = socket.read();  //阻塞,读取客户端发送过来的数据
7     System.out.println(msg);
8	  .... // 其他处理
9 }

解释一下,上面采用一个循环的方式来解决这个问题,程序占用一个主线程不断执行while循环中的代码,当代码执行到第2行时如果没有客户端发生连接的请求则阻塞,不继续向下执行。直到某个客户端发生连接请求,于是获得了socket对象,这个对象假设包括客户端的ip地址和端口号,并且可以通过socket与客户端接受和发送数据。之后执行到第6行代码,这里也会阻塞直到用户发生了数据。上面的服务器代码如果只有一个客户端与它交互是没有问题的,如果超过一个用户与之交互则会发生阻塞的情况,假设有两个客户A和B,A已经连接好了服务器也就是上面代码执行到了第6行代码进行阻塞,此时服务器希望收到客户发送的数据。就在阻塞的这个时候,如果B想要连接服务器,发送了连接请求,但是服务器代码一直卡在第6行等待获取客户端的发生数据,如果A一直不发送数据则B永远连不上服务器。除非等到A发送了一个数据,于是程序运行到第2行,然后接受B的连接请求,然后又卡在了第6行。很明显,上面的网络编程服务程序很糟糕,非常卡,连得上连不上完全看运气。失败!

这个时候,Doug Lea进行思考,阻塞是因为网络编程就是基于事件触发的,也就是说接受连接的第二行代码和读取数据的第六行代码完全取决于客户端,什么时候触发完全随机,因此很难搞。另外一个最主要的原因是这个是单线程程序,那么使用多线程能不能解决呢?答案是基本上可以解决,而且早期的Tomcat服务器就是这样设计的,这个模式就叫做 Connection Per Thread模式。下面进行详细介绍!

多线程经典Connection Per Thread模式

Connection Per Thread 即一个连接创建一个线程来处理,首先我们分析一下一台上述的内存32G的机器可以创建多少个线程,Java虚拟机默认一个线程占用1MB的栈内存,在不考虑其他情况下,假设分配给了虚拟机栈20G的空间,那么可以创建20*1024个线程来应对网络连接,也就是可以同时并发20480个客户端的请求。我们先看如何实现,再看它的缺点是什么,实现代码如下:

public class ConnectionPerThread implements Runnable {
    @Override
    public void run(){
        Socket socket = new Socket();
        while(true){
            acceptedSocket = socket.accept(); //依旧是阻塞方法,接受客户端的连接请求
            // 如果有一个连接就立即创建一个线程为这个连接服务,直到连接断开
            Handler handler = new Handler(socket);
            new Thread(handler).start(); // 启动新线程执行run方法
        }
    }

    class Handler implements Runnable{
        Socket socket;
        public Handler(Socket socket){
            this.socket = socket;
        }
        @Override
        public void run() {
            while (true){
                String msg = socket.read(); //依旧是阻塞方法,接受客户端的发送的数据
                if("close".equals(msg)){ // 假设客户端主动断开发送`close`字符,NIO中是空字符串表示断开
                    break; // 终止线程
                }
                // 也可以执行写操作,如果是发送大数据会明显阻塞,如果小文件可视为非阻塞,本质还是会阻塞
                socket.write("hello 用户!");
            }
        }
    }
}

以上的Socket使用的是伪代码,实际上需要使用OIO或者NIO的ServerSocket对象,反正能够表达这个意思就行。其实上面的代码还可以使用线程池来维护线程进行优化,但是这里只是为了举例说明多线程也是可以的实现较高并发的网络通信。下面来具体分析:

以上示例代码中,对于每一个新的网络连接都分配给一个线程。每个线程都独自处理自己负责的socket连接的输入和输出。当然,服务器的监听线程也是独立的,任何的socket连接的输入和输出处理,不会阻塞到后面新socket连接的监听和建立,这样,服务器的吞吐量就得到了提升。早期版本的Tomcat服务器,就是这样实现的。Connection Per Thread模式(一个线程处理一个连接)的优点是:解决了前面的新连接被严重阻塞的问题,在一定程度上,较大的提高了服务器的吞吐量。Connection Per Thread模式的缺点是:对应于大量的连接,需要耗费大量的线程资源,对线程资源要求太高。在系统中,线程是比较昂贵的系统资源。如果线程的数量太多,系统无法承受。而且,线程的反复创建、销毁、线程的切换也需要代价。因此,在高并发的应用场景下,多线程OIO的缺陷是致命的。新的问题来了:如何减少线程数量,比如说让一个线程同时负责处理多个socket连接的输入和输出,行不行呢? 可以的,一个有效途径是:使用Reactor反应器模式。用反应器模式对线程的数量进行控制,做到一个线程处理大量的连接。它是如何做到呢?直接上正餐——多线程的Reactor反应器模式。

多线程Reactor反应器模式

唤醒你的回忆,还记得Selector和IO多路复用不?不记得的话请访问:https://blog.csdn.net/cj151525/article/details/135695467 查看!我们前面讲到,客户端的连接和发送数据等行为是以IO事件的方式触发Selector的查询的,仅仅使用一个线程的Selector模式,就可以应付大量的访问,其主旨就是:如果某个用户阻塞了那本线程就去为别的需要服务的用户服务,而不是傻傻等待你阻塞解除,总而言之就是线程只为通过Selector.select()查询出来的需要执行的事件服务。因此,单线程下效率就非常高,例如Redis的数据处理模块就是单线程的,单线程的优点就是线程安全,CPU不需要频繁上下文切换。这种模式下,并发量上10万都是简简单单的。那么你敢想想如果我们引进多线程将会有多高的并发量吗?线程并不是越多越好,当你的线程数量和你的CPU核心数相同时就不会频繁发生CPU上下文切换,当线程数远远超过CPU核心数才会频繁发生导致执行效率不高,甚至阻塞等问题。好的,目前基础已经讲解完毕,下面正式引入Reactor反应器模式。

引用一下Doug Lea大师在文章《Scalable IO in Java》中对反应器模式的定义,具体如下:Reactor反应器模式由Reactor反应器线程、 Handlers处理器两大角色组成,两大角色的职责分别如下:

(1) Reactor反应器线程的职责:负责响应IO事件,并且分发到Handlers处理器。

(2) Handlers处理器的职责:非阻塞的执行业务处理逻辑。

每一个单独线程执行的Selector我们就叫做Reactor反应器。一个Reactor反应器包括一个Selector对象,另外还有需要干的活儿,也就是run方法中需要执行的逻辑,这个逻辑叫做Handler处理器。因此,如何理解Reactor反应器,就是单独线程来执行的Selector。明白了这些之后,那么我们将Selector分为Boss和Worker,Boss只有一位负责用户的连接请求与任务分发,Worker可以有很多,负责发送和接受用户的数据以及处理这些数据的中间过程。Boss和每个Worker就是一个Reactor,多线程Reactor反应器模式的模型如下(黄色的是方法,橙色是对象):

在这里插入图片描述

下面是代码实现,注意为了和Netty中EventLoop概念一致,这里Reactor使用EventLoop替代,你只要知道这两的概念是同一个,就是单独线程执行的Selector。代码如下:

package com.cheney.nioBaseTest;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @version 1.0
 * @Author Chenjie
 * @Date 2024-01-21 18:39
 * @注释
 */
public class ReactorTest {
    public static void main(String[] args) throws IOException {
        new BossEventLoop().register();
    }

    /**
     * BossReactor,EventLoop和Reactor是同一个概念
     */
    @Slf4j
    static class BossEventLoop implements Runnable {
        private Selector bossSelector;
        private WorkerEventLoop[] workers; // 一个boss负责分配任务,worker负责执行任务
        private volatile boolean start = false; // 对象的方法只能执行一次
        AtomicInteger index = new AtomicInteger(); // WorkerEventLoop[]数组的下标

        public void register() throws IOException {
            if (!start) {
                // 连接Channel
                ServerSocketChannel ssc = ServerSocketChannel.open();
                ssc.bind(new InetSocketAddress(8080));
                ssc.configureBlocking(false);
                bossSelector = Selector.open();
                // Boss 注册连接事件
                SelectionKey ssckey = ssc.register(bossSelector, 0, null);
                ssckey.interestOps(SelectionKey.OP_ACCEPT);
                // 创建若干个WorkerReactor来读取发送数据
                workers = initEventLoops();
                // 本Boss一个线程启动起来先
                new Thread(this, "boss").start();
                log.debug("boss start...");
                start = true;
            }
        }

        /**
         * 创建若干个WorkerEventLoop
         * @return
         */
        public WorkerEventLoop[] initEventLoops() {
//        EventLoop[] eventLoops = new EventLoop[Runtime.getRuntime().availableProcessors()];
            WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[2];
            for (int i = 0; i < workerEventLoops.length; i++) {
                workerEventLoops[i] = new WorkerEventLoop(i);
            }
            return workerEventLoops;
        }

        /**
         * Boss需要执行连接和任务分发,就是概念中的Handler处理器,图中的AcceptorHandler
         */
        @Override
        public void run() {
            while (true) {
                try {
                    bossSelector.select();
                    Iterator<SelectionKey> iter = bossSelector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        if (key.isAcceptable()) {
                            // 前面只注册了连接事件,因此只要负责建立连接并将后续的任务分发给Worker就行
                            ServerSocketChannel c = (ServerSocketChannel) key.channel();
                            SocketChannel sc = c.accept();// 建立连接
                            sc.configureBlocking(false);
                            log.debug("{} connected", sc.getRemoteAddress());
                            // 分发给Worker来处理,这里是公平地轮询,即每个Worker公平循环领取任务去执行
                            // 因为每个Worker其实就是一个Selector,而每个Selector可以管理多个Channel(用户交互)
                            workers[index.getAndIncrement() % workers.length].register(sc);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * WorkerReactor,主要负责读取用户发来的数据
     */

    @Slf4j
    static class WorkerEventLoop implements Runnable {
        private Selector workerSelector;
        private volatile boolean start = false;
        private int index;

        // 任务队列,存放可执行的命令,两个线程需要传参的话通过队列来实现执行逻辑解耦
        private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();

        public WorkerEventLoop(int index) {
            this.index = index;
        }

        public void register(SocketChannel sc) throws IOException {
            if (!start) {
                workerSelector = Selector.open();
                // 启动一个新线程执行本类的run方法
                new Thread(this, "worker-" + index).start();
                start = true;
            }
            tasks.add(() -> {
                // 向任务队列中添加任务(即需要执行的指令)
                try {
                    SelectionKey sckey = sc.register(workerSelector, 0, null);
                    sckey.interestOps(SelectionKey.OP_READ);
                    workerSelector.selectNow();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            // 唤醒Selector
            workerSelector.wakeup();
        }

        /**
         * WorkerReactor 的Handler处理器,负责读取用户发过来的数据
         */
        @Override
        public void run() {
            while (true) {
                try {
                    workerSelector.select();
                    // 从任务队列中获取一个任务并执行
                    Runnable task = tasks.poll();
                    if (task != null) {
                        task.run();
                    }
                    Set<SelectionKey> keys = workerSelector.selectedKeys();
                    Iterator<SelectionKey> iter = keys.iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        if (key.isReadable()) {
                            // 读取客户端发生过来的数据
                            SocketChannel sc = (SocketChannel) key.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(128);
                            try {
                                int read = sc.read(buffer);
                                if (read == -1) { // 如果-1则是用户断开连接触发的读事件
                                    key.cancel();
                                    sc.close();
                                } else {
                                    buffer.flip();
                                    log.debug("{} message:", sc.getRemoteAddress());
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                                key.cancel();
                                sc.close();
                            }
                        }
                        iter.remove();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

总结

什么是Reactor?答:一个线程对应一个Selector模式的对象,Reactor模式其中BossReactor负责客户端连接与任务分发给WorkerReactor对象,WorkerReactor负责具体的数据发送与接受等操作。而各自所负责的任务也被叫做Handler(处理器)。相信看完上面的讲解和代码,你已经知道了什么是Reactor模式了!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1401362.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PID笔记

Improving the Beginner’s PID 参考资料 Improving the Beginner’s PID – Introduction The Beginner’s PID 以下是每个人第一次学习的PID方程&#xff1a; 这导致几乎每个人都编写了以下PID控制器&#xff1a; /*working variables*/ unsigned long lastTime; double…

Mongo集群入门

一、前言 MongoDB 有三种集群架构模式&#xff0c;分别为主从复制&#xff08;Master-Slaver&#xff09;、副本集&#xff08;Replica Set&#xff09;和分片&#xff08;Sharding&#xff09;模式。 Master-Slaver 是一种主从复制的模式&#xff0c;目前已经不推荐使用。 Re…

vue.js js 雪花算法ID生成 vue.js之snowFlake算法

随着前端业务越来越复杂&#xff0c;自定义表单数据量比较大&#xff0c;每条数据的id生成则至关重要。想到前期IOS中实现的雪花算法ID&#xff0c;照着其实现JS版本&#xff0c;供大家学习参考。 一、库的建立引入 在你项目中创建一个snowFlake.js的文件&#xff1a;拷贝以下…

《Windows核心编程》若干知识点应用实战分享

目录 1、进程的虚拟内存分区与小于0x10000的小地址内存区 1.1、进程的虚拟内存分区 1.2、小于0x10000的小地址内存区 2、保存线程上下文的CONTEXT结构体 3、从汇编代码角度去理解多线程运行过程的典型实例 4、调用TerminateThread强制结束线程会导致线程中的资源没有释放…

大数据关联规则挖掘:Apriori算法的深度探讨

文章目录 大数据关联规则挖掘&#xff1a;Apriori算法的深度探讨一、简介什么是关联规则挖掘&#xff1f;什么是频繁项集&#xff1f;什么是支持度与置信度&#xff1f;Apriori算法的重要性应用场景 二、理论基础项和项集支持度&#xff08;Support&#xff09;置信度&#xff…

vue3相比vue2的效率提升

1、静态提升 2、预字符串化 3、缓存事件处理函数 4、Block Tree 5、PatchFlag 一、静态提升 在vue3中的app.vue文件如下&#xff1a; 在服务器中&#xff0c;template中的内容会变异成render渲染函数。 最终编译后的文件&#xff1a; 1.静态节点优化 那么这里为什么是两部分…

[SwiftUI]自定义滚动菜单栏进行PageView页面切换

如图&#xff0c;自定义一个菜单栏&#xff0c;要求点击菜单按钮和滚动翻页步调统一。 首先有个分类模型 import Foundationstruct CategoryModel: Hashable {var categoryID: Int 0var categoryName: String ""} 基础实现代码如下&#xff0c;点击菜单和滚动页面…

C++(14)——string的模拟实现

前几篇文章中介绍了关于以及其相关函数的使用&#xff0c;为了更清楚的了解这些函数的作用&#xff0c;本篇文章通过模拟实现的方式来加深对于函数作用原理的理解。 目录 1. String的整体框架&#xff1a; 1.1 成员变量&#xff1a; 1.2 构造函数&#xff1a; 1.3 析构函数…

2023年总结我所经历的技术大变革

&#x1f4e2;欢迎点赞 &#xff1a;&#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff0c;赐人玫瑰&#xff0c;手留余香&#xff01;&#x1f4e2;本文作者&#xff1a;由webmote 原创&#x1f4e2;作者格言&#xff1a;新的征程&#xff0c;我们面对的不仅…

快速玩转 Mixtral 8x7B MOE大模型!阿里云机器学习 PAI 推出最佳实践

作者&#xff1a;熊兮、贺弘、临在 Mixtral 8x7B大模型是Mixtral AI推出的基于decoder-only架构的稀疏专家混合网络&#xff08;Mixture-Of-Experts&#xff0c;MOE&#xff09;开源大语言模型。这一模型具有46.7B的总参数量&#xff0c;对于每个token&#xff0c;路由器网络选…

Acwing 138 周赛 解题报告 | 珂学家 | 偏序 + DP构造

前言 整体评价 很久没做acwing周赛了, 之前vp过一些周赛&#xff0c;感觉风格变了。 这次感觉还可以&#xff0c;都是些眼熟的套路题。 A. 5458. 进水排水问题 思路: 签到题 按题意描述编写 import java.io.*; import java.util.*;public class Main {public static void …

解决 conda新建虚拟环境只有一个conda-meta文件&conda新建虚拟环境不干净

像以前一样通过conda 新建虚拟环境时发现环境一团糟&#xff0c;首先新建虚拟环境 conda create -n newenv这时候activate newenv&#xff0c;通过pip list&#xff0c;会发现有很多很多的包&#xff0c;都是我在其他环境用到的。但诡异的是&#xff0c;来到anaconda下env的目…

openEuler安装KVM

1、关闭防火墙和selinux [rootlocalhost ~]# systemctl stop firewalld[rootlocalhost ~]# setenforce 0 2、下载软件包 libvirt&#xff1a;用于管理虚拟化平台的开源的 API&#xff0c;后台程序和管理工具。 qemu&#xff1a;开源&#xff08;模拟&#xff09;软件&#…

【51单片机】IO 扩展(串转并)--74HC595

0、前言 参考&#xff1a; 普中 51 单片机开发攻略 第12章 【51单片机入门教程-2020版 程序全程纯手打 从零开始入门】 https://www.bilibili.com/video/BV1Mb411e7re/?p21&share_sourcecopy_web&vd_source77e36f24add8dc77c362748ffb980148 nop()是什么语句&#…

算法常用思路总结

思路 1. 求数组中最大最小值思路代码 2. 计算阶乘思路&#xff1a;代码&#xff1a; 3. 得到数字的每一位思路代码 4. 计算时间类型5. 最大公约数、最小公倍数6. 循环数组的思想题目&#xff1a;猴子选大王代码 补充经典例题1. 复试四则运算题目内容题解 2. 数列求和题目内容题…

8.1 Java与数据库连接_XML(❤)

8.1 Java与数据库连接_XML 1. XML介绍与用途2. XML语法规则3. XML语义约束3.1 DTD语法3.2 创建DTD文件3.3 XML Schema语法1. XML介绍与用途 2. XML语法规则

常见的代码生成器使用

常见的代码生成器使用 目录概述需求&#xff1a; 设计思路实现思路分析1.第一部分2.第二部分 参考资料和推荐阅读 Survive by day and develop by night. talk for import biz , show your perfect code,full busy&#xff0c;skip hardness,make a better result,wait for cha…

Django随笔

关于Django的admin 1. 在url中把 from django.contrib import admin 重新解开 把path(admin/,admin.site.urls), 解开 2. 注册app&#xff0c;在配置文件中写 django.contrib.admin, 3.输入命令进行数据库迁移 Django国际化 配置文件中&#xff08;改成中文&#xff09; LA…

【STM32F103】DMA直接存储器访问游戏摇杆模块(ADCDMAEXTI)

前言&#xff08;可忽略&#xff09; 当初下定决心要走嵌入式的时候买了一堆传感器&#xff0c;但是因为懒和忙所以闲置了一堆&#xff0c;今天考完了最后一门&#xff0c;所以打算一个个都玩一遍&#xff0c;今天先从这个摇杆开始&#xff0c;当初买这个是想着以后做个遥控小…

指标异常检测和诊断

检测 是发现问题 诊断 是找到原因 误差的分类 系统误差&#xff1a;系统误差是由于仪器本身不精确&#xff0c;或实验方法粗略&#xff0c;或实验原理不完善而产生的。随机误差&#xff1a;随机误差是由各种偶然因素对实验者、测量仪器、被测物理量的影响而产生的。粗大误差&…