Java 高并发编程——Reactor模式(多线程)

news2025/1/11 17:06:58

1 多线程版本的Reactor模式演进

        Reactor和Handler挤在一个单线程中会造成非常严重的性能缺陷,可以使用多线程来对基础的Reactor模式进行改造。

        多线程Reactor的演进分为两个方面:

        1、升级Handler。既要使用多线程,又要尽可能高效率,则可以考虑使用线程池。

        2、升级Reactor。可以考虑引入多个Selector(选择器),提升选择大量通道的能力。

        总体来说,多线程版本的Reactor模式大致如下:

        (1)将负责数据传输处理的IOHandler处理器的执行放入独立的线程池中。这样,业务处理线程与负责新连接监听的反应器线程就能相互隔离,避免服务器的连接监听收到阻塞。

        (2)如果服务器为多核CPU,可以将反应器线程拆分为多个子反应器(SubReactor)线程;同时,引入多个选择器,并且为每一个SubReactor引入一个线程,一个线程负责一个选择器的事件轮询。这样充分释放了系统资源的能力,也大大提升了反应器管理大量连接或者监听大量传输通道的能力。

2 多线程版本Reactor的实战案例

        在上一篇中:Java 高并发编程——Reactor模式(单线程)的基础上完成多线程反应器的升级。多线程反应器的实战案例设计如下:

        1、引入多个选择器。

        2、设计一个新的子反应器(SubReactor)类,子反应器负责查询一个选择器。

        3、开发多个处理线程,一个处理器负责执行一个子反应器。

        为了提升效率,可以让SubReactor的数量和选择器的数量一致,避免多个线程负责一个选择器,导致需要进行线程同步,引起效率低下。

        多线程版本反应器MultiThreadEchoServerReactor的逻辑模型如下:

        多线程版本反应器MultiThreadEchoServerReactor的代码如下:

public class MultiThreadEchoServerReactor {
    ServerSocketChannel serverSocketChannel;
    AtomicInteger next = new AtomicInteger(0);
    Selector bollSelector = null;
    Reactor bossReactor = null;
    //selectors集合,引入多个selector选择器
    Selector[] workSelectors = new Selector[2];
    //引入多个子反应器
    Reactor[] workReactors = null;

    MultiThreadEchoServerReactor() throws IOException{
        //初始化多个selector选择器
        bollSelector = Selector.open();//用于监听新连接
        workSelectors[0] = Selector.open();//用于监听read,write事件
        workSelectors[1] = Selector.open();//用于监听read,write事件
        serverSocketChannel = ServerSocketChannel.open();
        InetSocketAddress address = new InetSocketAddress("127.0.0.1",8080);
        serverSocketChannel.socket().bind(address);
        serverSocketChannel.configureBlocking(false);
        //bossSelector 负责监听新连接事件,将serverSocketChannel注册到bossSelector
        SelectionKey sk = serverSocketChannel.register(bollSelector,SelectionKey.OP_ACCEPT);
        //绑定Handler:新连接监控handler绑定到SelectionKey(选择键)
        sk.attach(new AcceptorHandler());
        //bossSelector 反应器,处理新连接的bossSelector
        bossReactor = new Reactor(bollSelector);
        //第一个子反应器,一个子反应器负责一个worker选择器
        Reactor workReactor1 = new Reactor(workSelectors[0]);
        Reactor workReactor2 = new Reactor(workSelectors[1]);
        workReactors = new Reactor[]{workReactor1,workReactor2};
    }

    public MultiThreadEchoServerReactor( Selector bollSelector,ServerSocketChannel serverSocketChannel,) {
        this.serverSocketChannel = serverSocketChannel;
        this.bollSelector = bollSelector;
    }

    public void startServer(){
        new Thread(bossReactor).start();
        new Thread(workReactors[0]).start();
        new Thread(workReactors[1]).start();
    }
    //反应器
    class Reactor implements Runnable{
        private Selector selector;

        public Reactor(Selector selector) {
            this.selector = selector;
        }

        @Override
        public void run() {
            try {
                while(!Thread.interrupted()){
                    selector.select(1000);
                    Set<SelectionKey> selectededKeys = selector.selectedKeys();
                    if(null == selectededKeys | selectededKeys.size() == 0){
                        continue;
                    }
                    Iterator<SelectionKey> it = selectededKeys.iterator();
                    while(it.hasNext()){
                        SelectionKey sk = it.next();
                        dispatch(sk);
                    }
                    selectededKeys.clear();
                }
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }
    void dispatch(SelectionKey selectionKey){
        Runnable handler = (Runnable) selectionKey.attachment();
        //调用之前attach绑定到选择键的handler处理器对象
        if(handler != null){
            handler.run();
        }
    }
    //Handler:新连接处理器
    class AcceptorHandler implements Runnable{

        @Override
        public void run() {
            try {
                SocketChannel socketChannel = serverSocketChannel.accept();
                Logger.info("接收到一个新的连接");
                if(socketChannel != null){
                    int index = next.get();
                    Logger.info("选择器的编号:" + index);
                    Selector selector = workSelectors[index];
                    new MultiThreadEchoHandler(selector,socketChannel);
                }
            }catch (IOException e){
                e.printStackTrace();
            }
            if(next.incrementAndGet() == workSelectors.length){
                next.set(0);
            }
        }
    }
    public static void main(String[] args) throws IOException {
        MultiThreadEchoServerReactor server =
                new MultiThreadEchoServerReactor();
        server.startServer();
    }
}

class MultiThreadEchoHandler implements Runnable {
    final SocketChannel channel;
    final SelectionKey sk;
    final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    static final int RECIEVING = 0, SENDING = 1;
    int state = RECIEVING;
    //引入线程池
    static ExecutorService pool = Executors.newFixedThreadPool(4);

    MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
        channel = c;
        channel.configureBlocking(false);
        channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
        //仅仅取得选择键,后设置感兴趣的IO事件
        sk = channel.register(selector, 0);
        //将本Handler作为sk选择键的附件,方便事件dispatch
        sk.attach(this);
        //向sk选择键注册Read就绪事件
        sk.interestOps(SelectionKey.OP_READ);
        //唤醒 查询线程,使得OP_READ生效
        selector.wakeup();
        Logger.info("新的连接 注册完成");

    }

    public void run() {
        //异步任务,在独立的线程池中执行
        //提交数据传输任务到线程池
        //使得IO处理不在IO事件轮询线程中执行,在独立的线程池中执行
        pool.execute(new AsyncTask());
    }

    //异步任务,不在Reactor线程中执行
    //数据传输与业务处理任务,不在IO事件轮询线程中执行,在独立的线程池中执行
    public synchronized void asyncRun() {
        try {
            if (state == SENDING) {
                //写入通道
                channel.write(byteBuffer);

                //写完后,准备开始从通道读,byteBuffer切换成写模式
                byteBuffer.clear();
                //写完后,注册read就绪事件
                sk.interestOps(SelectionKey.OP_READ);
                //写完后,进入接收的状态
                state = RECIEVING;
            } else if (state == RECIEVING) {
                //从通道读
                int length = 0;
                while ((length = channel.read(byteBuffer)) > 0) {
                    Logger.info(new String(byteBuffer.array(), 0, length));
                }
                //读完后,准备开始写入通道,byteBuffer切换成读模式
                byteBuffer.flip();
                //读完后,注册write就绪事件
                sk.interestOps(SelectionKey.OP_WRITE);
                //读完后,进入发送的状态
                state = SENDING;
            }
            //处理结束了, 这里不能关闭select key,需要重复使用
            //sk.cancel();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    //异步任务的内部类
    class AsyncTask implements Runnable {
        public void run() {
            MultiThreadEchoHandler.this.asyncRun();
        }
    }

}

3 Reactor模式的优缺点

        在总结Reactor模式的优缺点之前,先看看Reactor模式和其他模式的对比。

        (1)Reactor模式与生产者消费者模式对比

        二者的相似之处:在一定程度上,Reactor模式优点类似生产者消费者模式。在生产消费者模式中,一个或多个生产者将事件加入一个队列中,一个或多个消费者主动从这个队列中拉去事件来处理。

        二者的不同之处:Reactor模式是基于查询的,没有专门的队列去缓冲存储IO事件,查到IO事件之后,反应器会根据不同的IO选择器将其分发给对应的Handler来处理。

        (2)Reactor模式与观察者模式对比

        二者的相似之处:在Reactor模式中,当查询到IO时间后,服务处理程序使用单路/多路分发策略,同步分发这些IO事件。观察者模式也被称为发布/订阅模式,它定义了一种依赖关系,让多个观察者同时监听同一个主题(topic)。这个主题对象在状态发生变化时会通知所有观察者,它们能够执行相应的处理。

        二者的不同之处:在Reactor模式中,Handler实例和IO事件的订阅关系基本上是一个事件绑定到一个Handler,每一个IO事件被查询后,反应器会将事件分发给所绑定的Handler,也就是一个事件只能被一个Handler处理;在观察者模式中,同一个时刻、同一个主题可以被订阅过的多个观察者处理。

        最后,总结一下Reactor模式的优缺点。作为高性能的IO模式,Reactor模式的优点如下:

        1、响应快,虽然同一反应器线程本身是同步的,但是不会被单个连接的IO操作所阻塞。

        2、编程相对简单,最大限度避免了复杂的多线程同步,也避免了多线程各个进程之间切换的开销。

        3、可扩展,可以方便地通过反应器线程的个数来充分利用CPU资源。

        Reactor模式的缺点如下:

        1、Reactor模式依赖于操作系统底层的IO多路复用系统调用的支持,如Linux中的epoll系统调用。如果操作系统的底层不支持IO多路复用,Reactor模式不会那么高效。

        2、在同一个Handler业务线程中,如果出现了一个长时间的数据读写,就会影响着反应器中其他通道的IO处理。例如,在大文件传输时,IO操作就会影响其他客户端的响应时间。对于这种操作,还需要进一步对Reactor模式进行改进。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1419758.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot第53集:微服务分布式架构,docker-compose,Prometheus,mqtt监控体系周刊

从0搭建一套Prometheus监控体系 简介&#xff1a; Prometheus是一款开源监控系统&#xff0c;起源于SoundCloud的警告工具包。自2012年以来&#xff0c;许多公司和组织开始广泛采用Prometheus。该项目具有活跃的开发人员和用户社区&#xff0c;吸引越来越多的参与者。如今&…

网站地址怎么改成HTTPS?

现在&#xff0c;所有类型的网站都需要通过 HTTPS 协议进行安全连接&#xff0c;而实现这一目标的唯一方法是使用 SSL 证书。如果您不将 HTTP 转换为 HTTPS&#xff0c;浏览器和应用程序会将您网站的连接标记为不安全。 但用户询问如何将我的网站从 HTTP 更改为 HTTPS。在此页…

springBoot - mybatis 多数据源实现方案

应用场景: 多数据源 小型项目 或者 大项目的临时方案中比较常用.在日常开发中,可能我们需要查询多个数据库,但是数据库实例不同,导致不能通过 指定schema的方式 区分不同的库, 这种情况下就需要我们应用程序配置多数据源 实现方式: 首先自定义实现 datasource数据源 为当前…

HT UI 5.0,前端组件图扑是认真的!

为顺应数字时代的不断发展&#xff0c;图扑 HT UI 5.0 在原有功能强大的界面组件库的基础上进行了全面升级&#xff0c;融入了更先进的技术、创新的设计理念以及更加智能的功能。HT UI 5.0 使用户体验更为直观、个性化&#xff0c;并在性能、稳定性和安全性等方面达到新的高度。…

【Spring实战】32 Spring Boot3 集成 Nacos 服务注册中心 并在 Gateway 网关中应用

文章目录 1. 定义2. 背景3. 功能和特性4. 下载安装5. 服务启动6. 使用示例1&#xff09;服务提供者2&#xff09;服务消费者3&#xff09;测试 7. 代码参考结语 1. 定义 Nacos 是 Dynamic Naming and Configuration Service 的首字母简称&#xff0c;一个更易于构建云原生应用…

自动化测试——selenium工具(web自动化测试)

1、自动化测试 优点&#xff1a;通过自动化测试有效减少人力的投入&#xff0c;同时提高了测试的质量和效率。 也用于回归测试。随着版本越来越多&#xff0c;版本回归的压力越来越大&#xff0c;仅仅通过人工测试 来回归所以的版本肯定是不现实的&#xff0c;所以…

薅运营商羊毛?封杀!

最近边小缘在蓝点网上看到一则消息 “浙江联通也开始严格排查PCDN和PT等大流量行为 被检测到可能会封停宽带”。 此前中国联通已经在四川和上海等多个省市严查家庭宽带 (部分企业宽带也被查) 使用 PCDN 或 PT&#xff0c;当用户的宽带账户存在大量上传数据的情况&#xff0c;中…

MySQL安全(一)权限系统

一、授权 1、创建用户 在MySQL中&#xff0c;管理员可以通过以下命令创建用户&#xff1a; namelocalhost IDENTIFIED BY password; name是要创建的用户名&#xff0c;localhost表示该用户只能从本地连接到MySQL&#xff0c;password是该用户的密码。如果要允许该用户从任何…

明亮成长:新生儿补充维生素A的关键

引言&#xff1a; 新生儿时期是生命最为脆弱而又充满希望的阶段&#xff0c;而维生素A作为生命的必需营养素之一&#xff0c;在新生儿的成长中扮演着至关重要的角色。本文将深入探讨维生素A的作用、补充时机&#xff0c;以及在补充维生素A时应该注意的事项&#xff0c;为家长提…

uniapp H5 touchstart touchend 切换背景会失效,或者没用

uniapp H5 touchstart touchend 切换背景会失效&#xff0c;或者没用 直接上代码 &#xff08;使用 class 以及 hover-class来设置样式&#xff09; class 设置默认的背景图或者样式 hover-class 来设置按下的背景图 或者样式 抬起 按下 <view class"mp_zoom_siz…

如何在群晖中本地部署WPS Office并实现公网远程访问

文章目录 1. 拉取WPS Office镜像2. 运行WPS Office镜像容器3. 本地访问WPS Office4. 群晖安装Cpolar5. 配置WPS Office远程地址6. 远程访问WPS Office小结 7. 固定公网地址 wps-office是一个在Linux服务器上部署WPS Office的镜像。它基于WPS Office的Linux版本&#xff0c;通过…

工业交换机厂家:社区和行业的坚实支柱

工业交换机厂家在现代社会中扮演着重要的角色&#xff0c;不仅在技术创新和产品质量上起着关键作用&#xff0c;而且在社区和整个行业中承担着诸多责任和义务。作为一家致力于服务社区和推动行业发展的工业交换机厂家&#xff0c;厚石网络深知自己的责任和使命&#xff0c;不断…

JVM系列——垃圾收集器

对象存活判断 引用计数法 在对象中添加一个引用计数器&#xff0c;每当有一个地方引用它时&#xff0c;计数器值就加一&#xff1b;当引用失效时&#xff0c;计数器值就减一&#xff1b;任何时刻计数器为零的对象就是不可能再被使用的。 可达性分析算法 通过一系列称为“GC …

Flask框架小程序后端分离开发学习笔记《5》简易服务器代码

Flask框架小程序后端分离开发学习笔记《5》 Flask是使用python的后端&#xff0c;由于小程序需要后端开发&#xff0c;遂学习一下后端开发。 简易服务器代码 接口解析那一块很关键&#xff0c;学后端服务器这一块&#xff0c;感觉主要就是学习相应地址的接口怎么处理。 然后…

30s学会JAVA几个关键词

1.final&#xff08;最终&#xff09; 修饰类-》此类无法被继承 修饰方法-》该方法不可被重写 修饰属性和局部变量-》看作常量&#xff0c;赋值位置&#xff1a;显式初始化&#xff0c;代码块初始化&#xff0c;构造器初始化 2.super(继承子类可用) 1.在子类方法或构造器中…

Python+uiautomator2 框架搭建

一、安装整体步骤 01 开发环境安装 jdk安装&#xff08;version "1.8.0_361"&#xff09;python安装 &#xff08;Python37&#xff09;python编辑器安装 &#xff08;PyCharm2021&#xff09; 02 运行环境安装 adb安装 &#xff08;Android Debug Bridge versio…

硬件知识(2) 手机的传感器-sensor

#灵感# 看看小米在干啥 手机型号&#xff1a;Redmi Note 13 Pro&#xff0c;解读一下它宣传的手机卖点。 目录 宣传1&#xff1a;1/1.4" 大底&#xff0c;f/1.65 大光圈&#xff0c; 宣传2&#xff1a;支持 2 亿像素超清直出&#xff0c;分辨率高达 16320 x 12240 宣…

2024.1.29 GNSS 学习笔记

1.假设只对4颗卫星进行观测定位&#xff0c;卫星的截止高度角是15&#xff0c;那么如何布设这四颗卫星的位置&#xff0c;使其围成的四面体的体积得到最大&#xff0c;以获得最好定位精度&#xff1f; 答&#xff1a;3颗卫星均匀分布在最低仰角面上&#xff0c;第4颗卫星在测站…

对前端限流操作(无Redis版本)

如何限制前端的请求次数 最近学习缓存击穿的时候&#xff0c;解决方法是限流&#xff0c;前端限制请求次数。故通过后端来对前端的请求做限流次数。 这里首先不用redis方法&#xff0c;这里采用通过Aop切面的方式来限制请求次数 创建限流注解 /*** 限流接口*/ Retention(Re…

Windows XP x86 sp3 安装 Google Chrome 49.0.2623.112 (正式版本) (32 位)

1 下载地址&#xff1b; https://dl.google.com/release2/h8vnfiy7pvn3lxy9ehfsaxlrnnukgff8jnodrp0y21vrlem4x71lor5zzkliyh8fv3sryayu5uk5zi20ep7dwfnwr143dzxqijv/49.0.2623.112_chrome_installer.exe 2 直接 双击 49.0.2623.112_chrome_installer.exe 安装&#xff1b; 3 …