IO流中「线程」模型总结

news2024/9/19 10:56:37

一、基础简介

在IO流的网络模型中,以常见的「客户端-服务端」交互场景为例;

客户端与服务端进行通信「交互」,可能是同步或者异步,服务端进行「流」处理时,可能是阻塞或者非阻塞模式,当然也有自定义的业务流程需要执行,从处理逻辑看就是「读取数据-业务执行-应答写数据」的形式;

Java提供「三种」IO网络编程模型,即:「BIO同步阻塞」、「NIO同步非阻塞」、「AIO异步非阻塞」;

二、同步阻塞

1、模型图解

BIO即同步阻塞,服务端收到客户端的请求时,会启动一个线程处理,「交互」会阻塞直到整个流程结束;

这种模式如果在高并发且流程复杂耗时的场景下,客户端的请求响应会存在严重的性能问题,并且占用过多资源; 

2、参考案例

服务端】启动ServerSocket接收客户端的请求,经过一系列逻辑之后,向客户端发送消息,注意这里线程的10秒休眠;

public class SocketServer01 {
    public static void main(String[] args) throws Exception {
        // 1、创建Socket服务端
        ServerSocket serverSocket = new ServerSocket(8080);
        // 2、方法阻塞等待,直到有客户端连接
        Socket socket = serverSocket.accept();
        // 3、输入流,输出流
        InputStream inStream = socket.getInputStream();
        OutputStream outStream = socket.getOutputStream();
        // 4、数据接收和响应
        int readLen = 0;
        byte[] buf = new byte[1024];
        if ((readLen=inStream.read(buf)) != -1){
            // 接收数据
            String readVar = new String(buf, 0, readLen) ;
            System.out.println("readVar======="+readVar);
        }
        // 响应数据
        Thread.sleep(10000);
        outStream.write("sever-8080-write;".getBytes());
        // 5、资源关闭
        IoClose.ioClose(outStream,inStream,socket,serverSocket);
    }
}

客户端】Socket连接,先向ServerSocket发送请求,再接收其响应,由于Server端模拟耗时,Client处于长时间阻塞状态;

public class SocketClient01 {
    public static void main(String[] args) throws Exception {
        // 1、创建Socket客户端
        Socket socket = new Socket(InetAddress.getLocalHost(), 8080);
        // 2、输入流,输出流
        OutputStream outStream = socket.getOutputStream();
        InputStream inStream = socket.getInputStream();
        // 3、数据发送和响应接收
        // 发送数据
        outStream.write("client-hello".getBytes());
        // 接收数据
        int readLen = 0;
        byte[] buf = new byte[1024];
        if ((readLen=inStream.read(buf)) != -1){
            String readVar = new String(buf, 0, readLen) ;
            System.out.println("readVar======="+readVar);
        }
        // 4、资源关闭
        IoClose.ioClose(inStream,outStream,socket);
    }
}

三、同步非阻塞

1、模型图解

NIO即同步非阻塞,服务端可以实现一个线程,处理多个客户端请求连接,服务端的并发能力得到极大的提升;

这种模式下客户端的请求连接都会注册到Selector多路复用器上,多路复用器会进行轮询,对请求连接的IO流进行处理;

2、参考案例

服务端】单线程可以处理多个客户端请求,通过轮询多路复用器查看是否有IO请求;

public class SocketServer01 {
    public static void main(String[] args) throws Exception {
        try {
            //启动服务开启监听
            ServerSocketChannel socketChannel = ServerSocketChannel.open();
            socketChannel.socket().bind(new InetSocketAddress("127.0.0.1", 8989));
            // 设置非阻塞,接受客户端
            socketChannel.configureBlocking(false);
            // 打开多路复用器
            Selector selector = Selector.open();
            // 服务端Socket注册到多路复用器,指定兴趣事件
            socketChannel.register(selector, SelectionKey.OP_ACCEPT);
            // 多路复用器轮询
            ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
            while (selector.select() > 0){
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> selectionKeyIter = selectionKeys.iterator();
                while (selectionKeyIter.hasNext()){
                    SelectionKey selectionKey = selectionKeyIter.next() ;
                    selectionKeyIter.remove();
                    if(selectionKey.isAcceptable()) {
                        // 接受新的连接
                        SocketChannel client = socketChannel.accept();
                        // 设置读非阻塞
                        client.configureBlocking(false);
                        // 注册到多路复用器
                        client.register(selector, SelectionKey.OP_READ);
                    } else if (selectionKey.isReadable()) {
                        // 通道可读
                        SocketChannel client = (SocketChannel) selectionKey.channel();
                        int len = client.read(buffer);
                        if (len > 0){
                            buffer.flip();
                            byte[] readArr = new byte[buffer.limit()];
                            buffer.get(readArr);
                            System.out.println(client.socket().getPort() + "端口数据:" + new String(readArr));
                            buffer.clear();
                        }
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

客户端】每隔3秒持续的向通道内写数据,服务端通过轮询多路复用器,持续的读取数据;

public class SocketClient01 {
    public static void main(String[] args) throws Exception {
        try {
            // 连接服务端
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 8989));
            ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
            String conVar = "client-hello";
            writeBuffer.put(conVar.getBytes());
            writeBuffer.flip();
            // 每隔3S发送一次数据
            while (true) {
                Thread.sleep(3000);
                writeBuffer.rewind();
                socketChannel.write(writeBuffer);
                writeBuffer.clear();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

四、异步非阻塞

1、模型图解

AIO即异步非阻塞,对于通道内数据的「读」和「写」动作,都是采用异步的模式,对于性能的提升是巨大的;

这与常规的第三方对接模式很相似,本地服务在请求第三方服务时,请求过程耗时很大,会异步执行,第三方第一次回调,确认请求可以被执行;第二次回调则是推送处理结果,这种思想在处理复杂问题时,可以很大程度的提高性能,节省资源:

2、参考案例

服务端】各种「accept」、「read」、「write」动作是异步,通过Future来获取计算的结果;

public class SocketServer01 {
    public static void main(String[] args) throws Exception {
        // 启动服务开启监听
        AsynchronousServerSocketChannel socketChannel = AsynchronousServerSocketChannel.open() ;
        socketChannel.bind(new InetSocketAddress("127.0.0.1", 8989));
        // 指定30秒内获取客户端连接,否则超时
        Future<AsynchronousSocketChannel> acceptFuture = socketChannel.accept();
        AsynchronousSocketChannel asyChannel = acceptFuture.get(30, TimeUnit.SECONDS);

        if (asyChannel != null && asyChannel.isOpen()){
            // 读数据
            ByteBuffer inBuffer = ByteBuffer.allocate(1024);
            Future<Integer> readResult = asyChannel.read(inBuffer);
            readResult.get();
            System.out.println("read:"+new String(inBuffer.array()));

            // 写数据
            inBuffer.flip();
            Future<Integer> writeResult = asyChannel.write(ByteBuffer.wrap("server-hello".getBytes()));
            writeResult.get();
        }

        // 关闭资源
        asyChannel.close();
    }
}

客户端】相关「connect」、「read」、「write」方法调用是异步的,通过Future来获取计算的结果;

public class SocketClient01 {
    public static void main(String[] args) throws Exception {
        // 连接服务端
        AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
        Future<Void> result = socketChannel.connect(new InetSocketAddress("127.0.0.1", 8989));
        result.get();

        // 写数据
        String conVar = "client-hello";
        ByteBuffer reqBuffer = ByteBuffer.wrap(conVar.getBytes());
        Future<Integer> writeFuture = socketChannel.write(reqBuffer);
        writeFuture.get();

        // 读数据
        ByteBuffer inBuffer = ByteBuffer.allocate(1024);
        Future<Integer> readFuture = socketChannel.read(inBuffer);
        readFuture.get();
        System.out.println("read:"+new String(inBuffer.array()));

        // 关闭资源
        socketChannel.close();
    }
}

五、Reactor模型

1、模型图解

这部分内容,可以参考「Doug Lea的《IO》」文档,查看更多细节;

1.1 Reactor设计原理

Reactor模式基于事件驱动设计,也称为「反应器」模式或者「分发者」模式;服务端收到多个客户端请求后,会将请求分派给对应的线程处理;

 

Reactor:负责事件的监听和分发;Handler:负责处理事件,核心逻辑「read读」、「decode解码」、「compute业务计算」、「encode编码」、「send应答数据」;

1.2 单Reactor单线程

【1】Reactor线程通过select监听客户端的请求事件,收到事件后通过Dispatch进行分发;

【2】如果是建立连接请求事件,Acceptor通过「accept」方法获取连接,并创建一个Handler对象来处理后续业务;

【3】如果不是连接请求事件,则Reactor会将该事件交由当前连接的Handler来处理;

【4】在Handler中,会完成相应的业务流程;

这种模式将所有逻辑「连接、读写、业务」放在一个线程中处理,避免多线程的通信,资源竞争等问题,但是存在明显的并发和性能问题;

1.3 单Reactor多线程

【1】Reactor线程通过select监听客户端的请求事件,收到事件后通过Dispatch进行分发;

【2】如果是建立连接请求事件,Acceptor通过「accept」方法获取连接,并创建一个Handler对象来处理后续业务;

【3】如果不是连接请求事件,则Reactor会将该事件交由当前连接的Handler来处理;

【4】在Handler中,只负责事件响应不处理具体业务,将数据发送给Worker线程池来处理;

【5】Worker线程池会分配具体的线程来处理业务,最后把结果返回给Handler做响应;

这种模式将业务从Reactor单线程分离处理,可以让其更专注于事件的分发和调度,Handler使用多线程也充分的利用cpu的处理能力,导致逻辑变的更加复杂,Reactor单线程依旧存在高并发的性能问题;

1.4 主从Reactor多线程

【1】 MainReactor主线程通过select监听客户端的请求事件,收到事件后通过Dispatch进行分发;

【2】如果是建立连接请求事件,Acceptor通过「accept」方法获取连接,之后MainReactor将连接分配给SubReactor;

【3】如果不是连接请求事件,则MainReactor将连接分配给SubReactor,SubReactor调用当前连接的Handler来处理;

【4】在Handler中,只负责事件响应不处理具体业务,将数据发送给Worker线程池来处理;

【5】Worker线程池会分配具体的线程来处理业务,最后把结果返回给Handler做响应;

这种模式Reactor线程分工明确,MainReactor负责接收新的请求连接,SubReactor负责后续的交互业务,适应于高并发的处理场景,是Netty组件通信框架的所采用的模式;

2、参考案例

服务端】提供两个EventLoopGroup,「ParentGroup」主要是用来接收客户端的请求连接,真正的处理是转交给「ChildGroup」执行,即Reactor多线程模型;

@Slf4j
public class NettyServer {
    public static void main(String[] args) {
        // EventLoop组,处理事件和IO
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try {
            // 服务端启动引导类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class).childHandler(new ServerChannelInit());

            // 异步IO的结果
            ChannelFuture channelFuture = serverBootstrap.bind(8989).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}

class ServerChannelInit extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) {
        // 获取管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 编码、解码器
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        // 添加自定义的handler
        pipeline.addLast("serverHandler", new ServerHandler());
    }
}

class ServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 通道读和写
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Server-Msg【"+msg+"】");
        TimeUnit.MILLISECONDS.sleep(2000);
        String nowTime = DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN) ;
        ctx.channel().writeAndFlush("hello-client;time:" + nowTime);
        ctx.fireChannelActive();
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端】通过Bootstrap类,与服务器建立连接,服务端通过ServerBootstrap启动服务,绑定在8989端口,然后服务端和客户端进行通信;

public class NettyClient {
    public static void main(String[] args) {
        // EventLoop处理事件和IO
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            // 客户端通道引导
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class).handler(new ClientChannelInit());

            // 异步IO的结果
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8989).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

class ClientChannelInit extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) {
        // 获取管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 编码、解码器
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        // 添加自定义的handler
        pipeline.addLast("clientHandler", new ClientHandler());
    }
}

class ClientHandler extends ChannelInboundHandlerAdapter {
    /**
     * 通道读和写
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Client-Msg【"+msg+"】");
        TimeUnit.MILLISECONDS.sleep(2000);
        String nowTime = DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN) ;
        ctx.channel().writeAndFlush("hello-server;time:" + nowTime);
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().writeAndFlush("channel...active");
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

六、参考源码

编程文档:
https://gitee.com/cicadasmile/butte-java-note

应用仓库:
https://gitee.com/cicadasmile/butte-flyer-parent

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/825248.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

12.其他事件

12.1 页面加载事件 加载外部资源&#xff08;如图片、外联CSS和JavaScript等&#xff09;加载完毕时触发的事件 1.事件名&#xff1a;load ●监听页面所有资源加载完毕&#xff1a; ➢给window添加load事件 //页面加载事件 window.addEventListener( load, function () { //…

java后端富文本转word,再传递到浏览器下载。

思路参考&#xff0c;以及所有的工具类都使用了》牧羊人大佬的代码《 有帮助的话不用给到我点赞&#xff0c;给大佬点赞即可 这是前端代码&#xff0c;必须使用get。 post后端返回的流浏览器接收不到&#xff08;具体原因不详&#xff09;。get无法传递requestBody&#xff0c;…

Python实现GA遗传算法优化BP神经网络分类模型(BP神经网络分类算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 遗传算法&#xff08;Genetic Algorithm&#xff0c;GA&#xff09;最早是由美国的 John holland于20世…

STM32F4_内存管理(Malloc、Free)

目录 前言 1. 内存管理介绍 1.1 分块式内存管理 2. 实验程序 2.1 main.c 2.2 Malloc.c 2.3 Malloc.h 前言 相信大家在学习C语言的过程中&#xff0c;都会学习到 malloc 动态开辟函数和 free 释放内存函数&#xff1b;这两个函数带给我们的优越性是&#xff1a; 我们在使…

[深度学习] GPU处理能力(TFLOPS/TOPS)

计算能力换算 理论峰值 &#xff1d; GPU芯片数量GPU Boost主频核心数量*单个时钟周期内能处理的浮点计算次数 只不过在GPU里单精度和双精度的浮点计算能力需要分开计算&#xff0c;以最新的Tesla P100为例&#xff1a; 双精度理论峰值 &#xff1d; FP64 Cores &#xff0a;…

Scratch Blocks自定义组件之「下拉图标」

一、背景 由于自带的下拉图标是给水平布局的block使用&#xff0c;放在垂直布局下显得别扭&#xff0c;而且下拉选择后回修改image字段的图片&#xff0c;这让我很不爽&#xff0c;所以在原来的基础上稍作修改&#xff0c;效果如下&#xff1a; 二、使用说明 &#xff08;1&am…

转机来了,国内全新芯片技术取得突破,关键驱动引擎开始提速

芯片技术转机来了 我们都知道&#xff0c;芯片技术是现代信息技术的基石&#xff0c;它驱动着计算机、智能手机、物联网设备等各类电子设备的运行。 科技的不断进步&#xff0c;芯片技术也在不断演进。 从传统的集成电路到现代的微处理器和系统芯片&#xff0c;其计算能力和能…

Total Variation loss

Total Variation loss 适合任务 图像复原、去噪等 处理的问题 图像上的一点点噪声可能就会对复原的结果产生非常大的影响&#xff0c;很多复原算法都会放大噪声。因此需要在最优化问题的模型中添加一些正则项来保持图像的光滑性&#xff0c;图片中相邻像素值的差异可以通过…

Pytorch深度学习框架入门

1.pytorch加载数据 唤醒指定的python运行环境的命令&#xff1a; conda activate 环境的名称 from torch.utils.data import Dataset #Dataset数据处理的包 from PIL import Image import os#定义数据处理的类 class MyData(Dataset):#数据地址处理方法def __init__(self,ro…

从《信息技术服务数据中心业务连续性等级评价准则》看数据备份

​​​​​​​ 5月23日&#xff0c;国家标准化管理委员会与国家市场监督管理总局发布了《信息技术服务数据中心业务连续性等级评价准则》&#xff0c;旨在适应各行各业逐步深入的数字化转型&#xff0c;提升全社会对数据中心服务中断风险的重视。 信息技术服务数据中心业务连续…

KL15 是什么?ACC,crank,on等

KL含义 KL is the abbreviation for klemme which is the German term for connector / connection.KL是“ klemme”的缩写&#xff0c;这是德语中连接器或连接的术语。 KL30 &#xff0c;通常表示电瓶的正极。positive KL31&#xff0c;通常表示电瓶的负极。negative KL15, 通…

【NLP概念源和流】 04-过度到RNN(第 4/20 部分)

接上文 【NLP概念源和流】 03-基于计数的嵌入,GloVe(第 3/20 部分) 一、说明 词嵌入使许多NLP任务有了显著的改进。它对单词原理图的理解以及将不同长度的文本表示为固定向量的能力使其在许多复杂的NLP任务中非常受欢迎。大多数机器学习算法可以直接应用于分类和回归任务的…

go初识iris框架(三) - 路由功能处理方式

继了解get,post后 package mainimport "github.com/kataras/iris/v12"func main(){app : iris.New()//app.Handle(请求方式,url,请求方法)app.Handle("GET","/userinfo",func(ctx iris.Context){path : ctx.Path()app.Logger().Info(path) //获…

MTS性能监控你知道多少

前言 说到MySQL的MTS&#xff0c;相信很多同学都不陌生&#xff0c;从5.6开始基于schema的并行回放&#xff0c;到5.7的LOGICAL_CLOCK支持基于事务的并行回放&#xff0c;这些内容都有文章讲解&#xff0c;在本篇文章不再赘述。今天要讲的是&#xff0c;你知道如何查看并行回放…

最新AI系统ChatGPT网站源码/支持GPT4.0/GPT联网功能/支持ai绘画/mj以图生图/支持思维导图生成

使用Nestjs和Vue3框架技术&#xff0c;持续集成AI能力到系统&#xff01; 同步mj图片重新生成指令 同步 Vary 指令 单张图片对比加强 Vary(Strong) | Vary(Subtle) 同步 Zoom 指令 单张图片无限缩放 Zoom out 2x | Zoom out 1.5x 新增GPT联网提问功能、签到功能 一、功能演示 …

基于springboot生鲜物流系统-计算机毕设 附源码13339

springboot生鲜物流系统 摘要 生鲜产品易于腐烂、难贮存、不易长时间运输&#xff0c;生产者所面临的市场风险很大&#xff0c;很多生鲜产品无法实现“货畅其流”和“物尽其值”&#xff0c;适宜的生鲜产品物流体系就显得尤为重要。本文将广东省生鲜产品物流体系的构建作为一个…

删除链表中等于给定值 val 的所有节点

203. 移除链表元素 - 力扣&#xff08;LeetCode&#xff09; 给出链表 1->2->3->3->4->5->3, 和 val 3, 你需要返回删除3之后的链表&#xff1a;1->2->4->5。 分析思路&#xff1a;这道题的思路&#xff0c;与之前删除链表中重复的结点相似。 因…

腾讯云从业者认证考试考点——云网络产品

文章目录 腾讯云网络产品功能网络产品概述负载均衡&#xff08;Cloud Load Balancer&#xff09;私有网络&#xff08;Virtual Private Cloud&#xff0c;VPC&#xff09;专线接入弹性网卡&#xff08;多网卡热插拔服务&#xff09;NAT网关&#xff08;NAT Gateway&#xff09;…

了解 spring MVC + 使用spring MVC - springboot

前言 本篇介绍什么是spring MVC &#xff0c;如何使用spring MVC&#xff0c;了解如何连接客户端与后端&#xff0c;如何从前端获取各种参数&#xff1b;如有错误&#xff0c;请在评论区指正&#xff0c;让我们一起交流&#xff0c;共同进步&#xff01; 文章目录 前言1. 什么…

RD算法(四)登堂入室 —— 成像完成

SAR成像专栏目录_lightninghenry的博客-CSDN博客https://lightning.blog.csdn.net/article/details/122393577?spm=1001.2014.3001.5502先放RD算法最终的成像结果: 经简单的地距投影后为(地距投影的内容在后面的几何校正章节中讲解): 温哥华这地形还真像是一张怪兽的巨嘴呀…