nio使用

news2025/3/3 6:07:58

NIO : new Input/Output,,在java1.4中引入的一套新的IO操作API,,,旨在替代传统的IO(即BIO:Blocking IO),,,nio提供了更高效的 文件和网络IO的 操作,,

NIO中分为阻塞模式(Blocking)和非阻塞模式(Non-blocking),,通过configureBlocking(boolean)方法设置,

  • 阻塞模式:

    • I/O 操作阻塞线程: read() :如果没有数据可读,调用会一直阻塞等待数据
      write(): 如果网络缓冲区已经满了,会一直阻塞,直到缓冲区有位置 ,
      accept(): 会一直等待客户端连接
      这些操作都需要一个线程去维持,如果是高并发项目,线程池会打满,,
      与传统的BIO(blocking IO)类似,只是 底层实现更高效
  • 非阻塞模式 (多路复用)
    一般会和Selector一起使用, Selector是NIO中的一个关键组件,,可以监听多个通道触发的事件

事件的类型:

  • accept : 客户端发起连接请求时触发
  • connect : 连接建立触发的事件
  • read : 可读事件,读数据的时候触发,,或者在 客户端主动断开连接,或者客户端异常断开连接触发
  • write : 写入事件,在需要写出数据并且缓冲区有写入位置的时候触发

Selector去建立和channel的关联,,并且监听你想关注的事件,,,当事件被触发之后,selector.select() 就会往下运行,如果没有事件发生,就会阻塞在那里,,
如果有事件发生,可以通过 selector.selectedKeys() 获取到所有的事件SelectionKey,遍历并处理这些事件,,

这个selector.selectedKeys()获取到的事件,,并不会主动移除,,需要在处理完这个事件之后,手动移除,,否则在下一次遍历事件的时候,还会再遍历一次

遇到的问题:

  • 客户端向服务端发送了大量的数据,,read()事件,去读数据的ByteBuffer大小是有限制的,,就可能会产生黏包(多个数据黏到一起,需要拆解数据)和半包(一个数据只发了一部分,需要根据另一部分组装数据),,,如果一个数据很大,设置的ByteBuffer读不完,就需要ByteBuffer扩容,,
    每一个channel都需要一个自己的buffer,,这样数据才不会乱,,就可以将buffer设置在附件中:
    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    System.out.println("有客户端连接了"+sc);

                    sc.configureBlocking(false);
                    // 第三个参数就是 附件,,, 一个selectionKey 对应一个 附件,,,将buffer写入附件
                    ByteBuffer buffer = ByteBuffer.allocate(4); // attachment 附件

                    SelectionKey selectionKey = sc.register(selector, 0, buffer);
                    selectionKey.interestOps(SelectionKey.OP_READ);

当这个buffer不够用,需要扩容,扩容完了之后使用attch()放入新的附件,,attchment()获取附件

public class Server {

    public static void main(String[] args) throws IOException {

        Selector selector = Selector.open();

        ServerSocketChannel ssc = ServerSocketChannel.open();

        ssc.configureBlocking(false);

        SelectionKey sscKey = ssc.register(selector, 0, null);

        sscKey.interestOps(SelectionKey.OP_ACCEPT);


        ssc.bind(new InetSocketAddress(8080));
        System.out.println("server start ");

        while (true){
            selector.select();

            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                iterator.remove();

                if (key.isAcceptable()){
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    System.out.println("有客户端连接了"+sc);

                    sc.configureBlocking(false);
                    // 第三个参数就是 附件,,, 一个selectionKey 对应一个 附件,,,将buffer写入附件
                    ByteBuffer buffer = ByteBuffer.allocate(4); // attachment 附件

                    SelectionKey selectionKey = sc.register(selector, 0, buffer);
                    selectionKey.interestOps(SelectionKey.OP_READ);


                }else if (key.isReadable()){
                    try {
                        //
                        SocketChannel channel = (SocketChannel) key.channel();
                        // 拿到附件
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        // -1 表示客户端断开
                        int read = channel.read(buffer);
                        if (read == -1){
                            key.cancel();
                        }else{
                            boolean isExtend = split(buffer);

                            if (isExtend){
                                // 需要扩容
                                ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);

                                buffer.flip();
                                // 将旧的buffer中数据,,同步到新的buffer中
                                newBuffer.put(buffer);

                                // 替换新的附件
                                key.attach(newBuffer);
                            }

//                            // 这个buffer读到最后,还是没有提取出来,,
//                            if (buffer.position() == buffer.limit()){
//                                // 需要扩容
//                                ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);
//                                buffer.flip();
//                                // 将旧的buffer中数据,,同步到新的buffer中
//                                newBuffer.put(buffer);
//
//                                // 替换新的附件
//                                key.attach(newBuffer);
//                            }
                        }

//                        String s = Charset.defaultCharset().decode(buffer).toString();
//                        System.out.println("s = " + s);
                    } catch (IOException e) {
                        key.cancel();
                        throw new RuntimeException(e);
                    }
                }

            }
        }


    }



    private static boolean split(ByteBuffer source){
//        debugAll(source);
        boolean flag = false;
        source.flip();
        for (int i = 0; i < source.limit(); i++) {
            if (source.get(i)== '\n'){
                int pointPosition = source.position();
                int len = i+1 - pointPosition;
                ByteBuffer buffer = ByteBuffer.allocate(len);
                for (int j = 0; j < len; j++) {
                    byte b = source.get();
                    buffer.put(b);
                }
                flag = true;
                System.out.println("buffer1 = " + Charset.defaultCharset().decode(buffer));
                debugAll(buffer);
                buffer.flip();
                System.out.println("buffer2 = " + Charset.defaultCharset().decode(buffer));
//                System.out.println(Charset.defaultCharset().decode(buffer));
            }
        }

        source.compact();
        return !flag;
    }
public class Client {

    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();

//        sc.configureBlocking(false);

        sc.connect(new InetSocketAddress("localhost", 8080));


        SocketAddress localAddress = sc.getLocalAddress();
        sc.write(Charset.defaultCharset().encode("hello\n123131server\n"));

        new Scanner(System.in).next();

        System.in.read();

    }
}
  • 客户端正常关闭会触发read事件,导致服务端无限循环去处理这个read事件
    判断 如果read返回-1,表示没读到数据,,客户端已经关闭,,使用 cancel() 取消事件
if(key.isReadable()){
                    ByteBuffer buffer = ByteBuffer.allocate(2);

                    // 关闭客户端会触发读事件  ,,这个read会进入selectkey
                    try {
                        SocketChannel channel = (SocketChannel) key.channel();

                        // 返回读到的字节数,,,如果返回-1 : 表示正常断开
                        int read = channel.read(buffer);
                        if (read == -1){
                            key.cancel();
                        }else {
                            buffer.flip();
//                            debugAll(buffer);
                            System.out.println(Charset.defaultCharset().decode(buffer));
                        }
                        
                    } catch (IOException e) {
                        // 异常断开
                        key.cancel();
                        throw new RuntimeException(e);
                    }
                }
  • 如果服务器发送很大的数据,,网络缓冲区一次性读不下,,就需要注册一个write事件进去,让Selector监测一旦网络缓冲区有位置了就去执行write事件
public class WriteServer {


    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);


        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);


        ssc.bind(new InetSocketAddress(8080));
        System.out.println("server start");

        while(true){
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

            while(iterator.hasNext()){
                SelectionKey key = iterator.next();
                iterator.remove();
                if(key.isAcceptable()){

//                    key.channel()
                    // serverSocketChannel只有一个,,就是创建的那个
                    SocketChannel sc = ssc.accept();
                    System.out.println("有客户端连接了"+sc);
                    sc.configureBlocking(false);
                    SelectionKey scKey = sc.register(selector, 0, null);
                    scKey.interestOps(SelectionKey.OP_READ);


                    // 向客户端发送大量数据
                    StringBuilder sb = new StringBuilder();
                    for (int i = 0; i < 3000000; i++) {
                        sb.append("a");
                    }

                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    // 这个write并不能保证,一次性吧数据都写给客户端 ==> 返回值表示一次写了多少字节
//                    while(buffer.hasRemaining()){
//                        // 网络的缓冲区是有限制的,,写不进去了,返回就是0 ===》 这样不符合非阻塞的思想,,,只要内容没发完,就一直在循环这里卡着,,虽然能将大量的数据发送给客户端,但是效率不搞
//                        // 发送缓冲区是有限制的  ==》 不要一直卡在这里
//                        int write = sc.write(buffer);
//                        System.out.println("write = " + write);
//                    }


                    if (buffer.hasRemaining()) {
                        // 是否有剩余内容
                        // 注册写事件   ===> 必须把之前的interest加上去,,不然会把之前的事件覆盖掉
                        scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
//                        scKey.interestOps(scKey.interestOps() | SelectionKey.OP_WRITE);
                        // 把buffer关联到selectionKey
                        scKey.attach(buffer);

                    }

                }else if(key.isWritable()){

                    SocketChannel sc = (SocketChannel) key.channel();
                    ByteBuffer buffer = (ByteBuffer) key.attachment();

                    int write = sc.write(buffer);
                    System.out.println("write = " + write);
//                    if (write < buffer.)

                    // 写完了,,清除buffer,不用再关注可写事件
                    if (!buffer.hasRemaining()){
                        key.attach(null);
                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                    }


                }
            }
        }

    }
}

public class WriteClient {

    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));

        int count = 0;
        // 接收数据
        while (true) {
            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
            count += sc.read(buffer);
            System.out.println("count = " + count);

            // 重置指针
            buffer.clear();
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2308203.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【考试大纲】中级网络工程师考试大纲(最新版与旧版对比)

目录 引言考试科目1:网络工程师基础知识考试科目2:网络工程师应用技术引言 最新的网络工程师考试大纲出版于 2024 年 10 月,本考试大纲基于此版本整理。 考试科目1:网络工程师基础知识 计算机系统知识1.1 计算机硬件知识 1.2 操作系统知识 1.3 系统管理 系统开发和运行…

Spring的下载与配置

1. 下载spring开发包 下载地址&#xff1a;https://repo.spring.io/webapp/#/artifacts/browse/simple/General/libs-release-local/org/springframework/spring 打开之后可以看到有很多版本供选择&#xff0c;因为视频教程用的是4.2.4版本&#xff0c;于是我也选择这个 右键…

解决IDEA使用Ctrl + / 注释不规范问题

问题描述&#xff1a; ctrl/ 时&#xff0c;注释缩进和代码规范不一致问题 解决方式 设置->编辑器->代码样式->java->代码生成->注释代码

学术小助手智能体

学术小助手&#xff1a;开学季的学术领航员 文心智能体平台AgentBuilder | 想象即现实 文心智能体平台AgentBuilder&#xff0c;是百度推出的基于文心大模型的智能体平台&#xff0c;支持广大开发者根据自身行业领域、应用场景&#xff0c;选取不同类型的开发方式&#xff0c;…

kafka-leader -1问题解决

一. 问题&#xff1a; 在 Kafka 中&#xff0c;leader -1 通常表示分区的领导者副本尚未被选举出来&#xff0c;或者在获取领导者信息时出现了问题。以下是可能导致出现 kafka leader -1 的一些常见原因及相关分析&#xff1a; 1. 副本同步问题&#xff1a; 在 Kafka 集群中&…

【开源-鸿蒙土拨鼠大理石系统】鸿蒙 HarmonyOS Next App+微信小程序+云平台

✨本人自己开发的开源项目&#xff1a;土拨鼠充电系统 ✨踩坑不易&#xff0c;还希望各位大佬支持一下&#xff0c;在GitHub给我点个 Start ⭐⭐&#x1f44d;&#x1f44d; ✍GitHub开源项目地址&#x1f449;&#xff1a;https://github.com/lusson-luo/HarmonyOS-groundhog-…

HBuilder X中,uni-app、js的延时操作及定时器

完整源码下载 https://download.csdn.net/download/luckyext/90430165 在HBuilder X中&#xff0c;uni-app、js的延时操作及定时器可以用setTimeout和setInterval这两个函数来实现。 1.setTimeout函数用于在指定的毫秒数后执行一次函数。 例如&#xff0c; 2秒后弹出一个提…

升级TTSDK抖音小游戏banner广告接入

升级TTSDK抖音小游戏banner广告接入 介绍修改总结 介绍 我们原来使用的是unity2021&#xff0c;这次为了抖音新出的TTSDK中的新的API升级我们将项目升级为了unity2022&#xff0c;这次抖音官方剔除了原来StartSDKUnityTools和Start Asset Analyser&#xff08;startmini&#x…

ubuntu终端指令集 shell编程基础(一)

磁盘指令 连接与查看&#xff1a;磁盘与 Ubuntu 有两种连接方式&#xff1b;使用ls /dev/sd*查看是否连接成功&#xff0c;通过df系列指令查看磁盘使用信息。若 U 盘已挂载&#xff0c;相关操作可能失败&#xff0c;需用umount取消挂载。磁盘操作&#xff1a;使用sudo fdisk 磁…

win11编译pytorch cuda128版本流程

Geforce 50xx系显卡最低支持cuda128&#xff0c;torch cu128 release版本目前还没有释放&#xff0c;所以自己基于2.6.0源码自己编译wheel包。 1. 前置条件 1. 使用visual studio installer 安装visual studio 2022&#xff0c;工作负荷选择【使用c的桌面开发】,安装完成后将…

STM32G431RBT6——(1)芯片命名规则

相信很多新手入门STM学的芯片&#xff0c;是STM32F103C8T6&#xff0c;假如刷到个项目换个芯片类型&#xff0c;就会感到好难啊&#xff0c;看不懂&#xff0c;就无从下手&#xff0c;不知所云。其实没什么难的&#xff0c;对于一个个不同的芯片的区别&#xff0c;就像是学习包…

Ecode前后端传值

说明 在泛微 E9 系统开发过程中&#xff0c;使用 Ecode 调用后端接口并进行传值是极为常见且关键的操作。在上一篇文章中&#xff0c;我们探讨了 Ecode 调用后端代码的相关内容&#xff0c;本文将深入剖析在 Ecode 中如何向后端传值&#xff0c;以及后端又该如何处理接收这些值…

Wireshark:自定义类型帧解析

文章目录 1. 前言2. 背景3. 开发 Lua 插件 1. 前言 限于作者能力水平&#xff0c;本文可能存在谬误&#xff0c;因此而给读者带来的损失&#xff0c;作者不做任何承诺。 2. 背景 Wireshark 不认识用 tcpdump 抓取的数据帧&#xff0c;仔细分析相关代码和数据帧后&#xff0c…

2继续NTS库学习(读取shapefile)

引用库如下&#xff1a; 读取shapefile代码如下&#xff1a; namespace IfoxDemo {public class Class1{[CommandMethod("xx")]public static void nts二次学习(){Document doc Application.DocumentManager.MdiActiveDocument;var ed doc.Editor;string shpPath …

JavaWeb后端基础(3)

原打算把Mysql操作数据库的一些知识写进去&#xff0c;但是感觉没必要&#xff0c;要是现在会的都是简单的增删改查&#xff0c;所以&#xff0c;这一篇&#xff0c;我直接从java操作数据库开始写&#xff0c;所以这一篇大致就是记一下JDBC、MyBatis、以及SpringBoot的配置文件…

Vue程序下载

Vue是一个基于JavaScript&#xff08;JS&#xff09;实现的框架&#xff0c;想要使用它&#xff0c;就得先拿到Vue的js文件 Vue官网 Vue2&#xff1a;Vue.js Vue3&#xff1a;Vue.js - 渐进式 JavaScript 框架 | Vue.js 下载并安装vue.js 第一步&#xff1a;打开Vue2官网&a…

力扣 寻找重复数

二分&#xff0c;双指针&#xff0c;环形链表。 题目 不看完题就是排序后&#xff0c;用两个快慢指针移动&#xff0c;找到相同就返回即可。 class Solution {public int findDuplicate(int[] nums) {Arrays.sort(nums);int l0;int r1;while(r<nums.length){if(nums[l]num…

使用Docker将ros1自定义消息通过rosjava_bootstrap生成jar包

文章目录 预准备环境rosjava_bootstrap坏消息好消息 环境安装docker安装rosjava_bootstrap仓库rosjava_center仓库修改rosjava_bootstrap代码拉取docker镜像放置自己的自定义消息 启动docker编译 预准备环境 rosjava_bootstrap rosjava_bootstrap是将自定义的ROS消息生成java…

分治算法、动态规划、贪心算法、分支限界法和回溯算法的深度对比

1. 分治算法 (Divide and Conquer) 核心思想 分治法三步曲&#xff1a; 分解&#xff08;Divide&#xff09;&#xff1a;将原问题拆分为多个子问题解决&#xff08;Conquer&#xff09;&#xff1a;递归解决子问题合并&#xff08;Combine&#xff09;&#xff1a;合并子问题…

网络安全红队工具

目录 红队及发展趋势 基本概念 发展趋势 防守阶段 备战阶段 临战阶段 实战阶段 战后整顿 如果错过互联网,与你擦肩而过的不仅仅是机会,而是整整一个时代。 红队及发展趋势 基本概念 红队一般指实战攻防的防守方。 红队主要复盘总结现有防护系统的不足之处&#xff0c;为…