【Netty】nio处理acceptreadwrite事件

news2025/1/11 14:09:52

       📝个人主页:五敷有你      

 🔥系列专栏:Netty

⛺️稳中求进,晒太阳

1.处理accept

1.1客户端代码

public class Client {
    public static void main(String[] args) {
        try (Socket socket = new Socket("localhost", 8080)) {
            System.out.println(socket);
            socket.getOutputStream().write("world".getBytes());
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

1.2 服务端代码

@Slf4j
public class ChannelDemo {
    public static void main(String[] args) {
        try (ServerSocketChannel channel = ServerSocketChannel.open()) {
            channel.bind(new InetSocketAddress(8080));
            System.out.println(channel);
            Selector selector = Selector.open();
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                int count = selector.select();
//                int count = selector.selectNow();
                log.debug("select count: {}", count);
//                if(count <= 0) {
//                    continue;
//                }

                // 获取所有事件
                Set<SelectionKey> keys = selector.selectedKeys();

                // 遍历所有事件,逐一处理
                Iterator<SelectionKey> iter = keys.iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    // 判断事件类型
                    if (key.isAcceptable()) {
                        ServerSocketChannel c = (ServerSocketChannel) key.channel();
                        // 必须处理
                        SocketChannel sc = c.accept();
                        log.debug("{}", sc);
                    }
                    // 处理完毕,必须将事件移除
                    iter.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

💡 事件发生后能否不处理

事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发

水平触发是什么

参考这篇文章:细说八股 | NIO的水平触发和边缘触发到底有什么区别? - 掘金 (juejin.cn)

2. 处理read事件

2.1 服务端代码

@Slf4j
public class ChannelDemo {
    public static void main(String[] args) {
        try (ServerSocketChannel channel = ServerSocketChannel.open()) {
            channel.bind(new InetSocketAddress(8080));
            System.out.println(channel);
            Selector selector = Selector.open();
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                int count = selector.select();
//                int count = selector.selectNow();
                log.debug("select count: {}", count);
//                if(count <= 0) {
//                    continue;
//                }

                // 获取所有事件
                Set<SelectionKey> keys = selector.selectedKeys();

                // 遍历所有事件,逐一处理
                Iterator<SelectionKey> iter = keys.iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    // 判断事件类型
                    if (key.isAcceptable()) {
                        ServerSocketChannel c = (ServerSocketChannel) key.channel();
                        // 必须处理
                        SocketChannel sc = c.accept();
                        sc.configureBlocking(false);
                        sc.register(selector, SelectionKey.OP_READ);
                        log.debug("连接已建立: {}", sc);
                    } else if (key.isReadable()) {
                        SocketChannel sc = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(128);
                        int read = sc.read(buffer);
                        if(read == -1) {
                            key.cancel();
                            sc.close();
                        } else {
                            buffer.flip();
                            debug(buffer);
                        }
                    }
                    // 处理完毕,必须将事件移除
                    iter.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

💡 为何要 iter.remove()

因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如

  • 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey

  • 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常

💡 cancel 的作用

cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件。

注意:

关闭远程连接会发送一次可读事件,返回值为-1,需要cancel和close和remove

⚠️ 不处理边界的问题

服务端

public class Server {
    public static void main(String[] args) throws IOException {
        ServerSocket ss=new ServerSocket(9000);
        while (true) {
            Socket s = ss.accept();
            InputStream in = s.getInputStream();
            // 这里这么写,有没有问题
            byte[] arr = new byte[4];
            while(true) {
                int read = in.read(arr);
                // 这里这么写,有没有问题
                if(read == -1) {
                    break;
                }
                System.out.println(new String(arr, 0, read));
            }
        }
    }
}

客户端

public class Client {
    public static void main(String[] args) throws IOException {
        Socket max = new Socket("localhost", 9000);
        OutputStream out = max.getOutputStream();
        out.write("hello".getBytes());
        out.write("world".getBytes());
        out.write("你好".getBytes());
        max.close();
    }
}

输出

为什么呢???????

处理消息的边界(3种)

  1. 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽

  2. 另一种思路是按分隔符拆分,缺点是效率低

  3. TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量

服务端

下述代码:按分隔符拆分,然后动态扩容

private static void split(ByteBuffer source) {
    source.flip();
    for (int i = 0; i < source.limit(); i++) {
        // 找到一条完整消息
        if (source.get(i) == '\n') {
            int length = i + 1 - source.position();
            // 把这条完整消息存入新的 ByteBuffer
            ByteBuffer target = ByteBuffer.allocate(length);
            // 从 source 读,向 target 写
            for (int j = 0; j < length; j++) {
                target.put(source.get());
            }
            debugAll(target);
        }
    }
    source.compact(); // 0123456789abcdef  position 16 limit 16
}

public static void main(String[] args) throws IOException {
    // 1. 创建 selector, 管理多个 channel
    Selector selector = Selector.open();
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false);
    // 2. 建立 selector 和 channel 的联系(注册)
    // SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件
    SelectionKey sscKey = ssc.register(selector, 0, null);
    // key 只关注 accept 事件
    sscKey.interestOps(SelectionKey.OP_ACCEPT);
    log.debug("sscKey:{}", sscKey);
    ssc.bind(new InetSocketAddress(8080));
    while (true) {
        // 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
        // select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理
        selector.select();
        // 4. 处理事件, selectedKeys 内部包含了所有发生的事件
        Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
        while (iter.hasNext()) {
            SelectionKey key = iter.next();
            // 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题
            iter.remove();
            log.debug("key: {}", key);
            // 5. 区分事件类型
            if (key.isAcceptable()) { // 如果是 accept
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                SocketChannel sc = channel.accept();
                sc.configureBlocking(false);
                ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
                // 将一个 byteBuffer 作为附件关联到 selectionKey 上
                SelectionKey scKey = sc.register(selector, 0, buffer);
                scKey.interestOps(SelectionKey.OP_READ);
                log.debug("{}", sc);
                log.debug("scKey:{}", scKey);
            } else if (key.isReadable()) { // 如果是 read
                try {
                    SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
                    // 获取 selectionKey 上关联的附件
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1
                    if(read == -1) {
                        key.cancel();
                    } else {
                        split(buffer);
                        // 需要扩容
                        if (buffer.position() == buffer.limit()) {
                            ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                            buffer.flip();
                            newBuffer.put(buffer); // 0123456789abcdef3333\n
                            key.attach(newBuffer);
                        }
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                    key.cancel();  // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
                }
            }
        }
    }
}

客户端

SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
SocketAddress address = sc.getLocalAddress();
// sc.write(Charset.defaultCharset().encode("hello\nworld\n"));
sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
System.in.read();

ByteBuffer 大小分配
  • 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer

  • ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer

    • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能。

    • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

3.处理Write事件

write就是将buffer中的内容通过channel传输过去。

  • 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)

  • 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略

    • 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上

    • selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册

    • 如果不取消,会每次可写均会触发 write 事件

public class WriteServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8080));

        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        while(true) {
            selector.select();

            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
                    // 1. 向客户端发送内容
                    StringBuilder sb = new StringBuilder();
                    for (int i = 0; i < 3000000; i++) {
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    int write = sc.write(buffer);
                    // 3. write 表示实际写了多少字节
                    System.out.println("实际写入字节:" + write);
                    // 4. 如果有剩余未读字节,才需要关注写事件
                    if (buffer.hasRemaining()) {
                        // read 1  write 4
                        // 在原有关注事件的基础上,多关注 写事件
                        sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
                        // 把 buffer 作为附件加入 sckey
                        sckey.attach(buffer);
                    }
                } else if (key.isWritable()) {
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    SocketChannel sc = (SocketChannel) key.channel();
                    int write = sc.write(buffer);
                    System.out.println("实际写入字节:" + write);
                    if (!buffer.hasRemaining()) { // 写完了
                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                        key.attach(null);
                    }
                }
            }
        }
    }
}

客户端

public class WriteClient {
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        SocketChannel sc = SocketChannel.open();
        sc.configureBlocking(false);
        sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
        sc.connect(new InetSocketAddress("localhost", 8080));
        int count = 0;
        while (true) {
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isConnectable()) {
                    System.out.println(sc.finishConnect());
                } else if (key.isReadable()) {
                    ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
                    count += sc.read(buffer);
                    buffer.clear();
                    System.out.println(count);
                }
            }
        }
    }
}
💡 write 为何要取消

只要向 channel 发送数据时,socket 缓冲可写,这个事件会频繁触发,因此应当只在 socket 缓冲区写不下时再关注可写事件,数据写完之后再取消关注.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1834030.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

秋招突击——6/16——复习{整理昨天的面试资料}——新作{删除链表倒数第n个节点}

文章目录 引言复习新作删除链表倒数第N个节点题目描述个人实现参考实现 总结 引言 主管面&#xff0c;面的很凄惨&#xff0c;不过无所谓了&#xff0c;我已经尽力了。上午都在整理的面经&#xff0c;没有复习算法&#xff0c;而且这两天要弄一下论文&#xff0c;二十号就要提…

Aspice介绍——测试流程

文章目录 ASPICE简介一、V字模型的示意二、测试领域2.1 SWE.6&#xff1a;软件合格性测试过程目的过程成果基本实践&#xff08;BP&#xff09; 2.2 SYS.4:系统集成和集成测试过程目的过程成果基本实践&#xff08;BP&#xff09; 2.3 SYS.5&#xff1a;系统合格性测试过程目的…

AI早班2024.6.18

先一步知道AI未来&#xff01; 全球AI新闻速递 1.绿米 AI 智能存在传感器 FP1E开售。 2.摩尔线程 师者AI&#xff1a;完成70亿参数教育AI大模型训练测试。 3.Google 在 AI 功能推出新功能&#xff0c;需要明确说明可能出错的地方。 4.北大、快手攻克复杂视频生成难题&#…

【unity笔记】三、冰山碰撞变成碎块效果

一、模型准备 共需准备两个模型&#xff0c;一个原始模型&#xff0c;一个破碎后的模型。 破碎后的模型制作教程&#xff1a; 下载Blender 导入原始模型在添加偏好设置中添加Cell Fracture插件&#xff0c;调整模型碎裂效果。导出&#xff0c;保存到项目预制体文件夹。 二、…

性能测试项目实战

项目介绍和部署 项目背景 轻商城项目是一个现在流行的电商项目。我们需要综合评估该项目中各个关键接口的性能&#xff0c;并给出优化建议&#xff0c;以满足项目上线后的性能需要。 项目功能架构 前台商城&#xff1a;购物车、订单、支付、优惠券等 后台管理系统&#xf…

基于springboot实现药店管理系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现药店管理系统演示 摘要 传统信息的管理大部分依赖于管理人员的手工登记与管理&#xff0c;然而&#xff0c;随着近些年信息技术的迅猛发展&#xff0c;让许多比较老套的信息管理模式进行了更新迭代&#xff0c;药品信息因为其管理内容繁杂&#xff0c;管理数…

wireshark使用情况与网口调试记录

wireshark使用情况与网口调试记录 前言wireshark无法获取本地数据方法一——Npcap方法二——WinPcap效果 UDP组播&#xff0c;却一直捕获到127.0.0.1总结 前言 在网口调试中&#xff0c;wireshark使用较多&#xff0c;常出现一些无法捕获或者ip获取数据不正确的情况&#xff0…

webpack工作流程

webpack工作流程 初始化参数&#xff1a;从配置文件和 Shell 语句中读取并合并参数,得出最终的配置对象用上一步得到的参数初始化 Compiler 对象加载所有配置的插件执行对象的 run 方法开始执行编译根据配置中的entry找出入口文件从入口文件出发,调用所有配置的Loader对模块进…

如何确保pcdn的稳定性?(壹)

确保PCDN的稳定性是一个重要任务&#xff0c;涉及多个方面的操作和考虑。以下是一些建议&#xff0c;帮助你确保PCDN的稳定性&#xff1a; 一&#xff0e;选择合适的服务器与硬件&#xff1a; 选择稳定可靠的服务器供应商和硬件设备&#xff0c;确保服务器具有高可用性和容错…

openGauss 6.0一主二备高可用架构部署,可靠很行

作者&#xff1a;IT邦德 中国DBA联盟(ACDU)成员&#xff0c;10余年DBA工作经验&#xff0c; Oracle、PostgreSQL ACE CSDN博客专家及B站知名UP主&#xff0c;全网粉丝10万 擅长主流Oracle、MySQL、PG、高斯及Greenplum备份恢复&#xff0c; 安装迁移&#xff0c;性能优化、故障…

支撑每秒 600 万订单无压力,SpringBoot + Disruptor 太猛了!

一、背景 工作中遇到项目使用Disruptor做消息队列,对你没看错,不是Kafka,也不是rabbitmq;Disruptor有个最大的优点就是快,还有一点它是开源的哦,下面做个简单的记录. 二、Disruptor介绍 Disruptor 是英国外汇交易公司LMAX开发的一个高性能队列&#xff0c;研发的初衷是解决内存…

软银与AI初创公司Perplexity达成合作;OpenAI或将改变治理结构,考虑成立营利性公司

&#x1f989; AI新闻 &#x1f680; 软银与AI初创公司Perplexity达成合作 摘要&#xff1a;6月17日消息&#xff0c;日本软银宣布与AI初创公司Perplexity达成战略合作&#xff0c;将于6月19日起向Softbank、Y-Mobile和LINEMO用户开放Perplexity Pro一年的免费试用申请。Perp…

Einops 张量操作快速入门

张量&#xff0c;即多维数组&#xff0c;是现代机器学习框架的支柱。操纵这些张量可能会变得冗长且难以阅读&#xff0c;尤其是在处理高维数据时。Einops 使用简洁的符号简化了这些操作。 Einops &#xff08;Einstein-Inspired Notation for operations&#xff09;&#xff…

CentOS 7 下gdb任意版本的升级

文章目录 前言查看gdb版本升级步骤 小结 前言 在做项目的过程中&#xff0c;遇到了难缠的bug&#xff0c;使用gdb调试的时候&#xff0c;bt调用堆栈看的一震头疼&#xff0c;于是就想起把gdb升级一下 当前环境&#xff1a;Centos7&#xff0c;gdb&#xff1a;7.6 稍微好看了那…

SpringCloudStream原理和深入使用

简单概述 Spring Cloud Stream是一个用于构建与共享消息传递系统连接的高度可扩展的事件驱动型微服务的框架。 应用程序通过inputs或outputs来与Spring Cloud Stream中binder对象交互&#xff0c;binder对象负责与消息中间件交互。也就是说&#xff1a;Spring Cloud Stream能…

AI工具快速制作爆火的影视视频混剪

今天给大家发一个有意思的工具&#xff0c;影视混剪大家应该都刷到过&#xff0c;像下面这种视频&#xff0c;播放量都超级高。 这种视频都是怎么做的呢&#xff1f; 现在AI工具这么多样性&#xff0c;先用 AI 写一段具有网感的对话段子&#xff0c;然后找影视剧片段混剪成一…

笑脸金融测试社招面经,期望20K

面经哥只做互联网社招面试经历分享&#xff0c;关注我&#xff0c;每日推送精选面经&#xff0c;面试前&#xff0c;先找面经哥 测试总监一面 1、问一些测试理论相关的知识。 自我介绍、质量模型 2、登录如何设计测试用例。 3、给你一个东西你会从哪些方面去考虑设计测试用…

【数据结构初阶】--- 堆的应用:topk

堆的功能&#xff1a;topk 为什么使用topk 先举个例子&#xff0c;假如说全国有十万家奶茶店&#xff0c;我现在想找到评分前十的店铺&#xff0c;现在应该怎么实现&#xff1f; 第一想法当然是排序&#xff0c;由大到小排序好&#xff0c;前十就能拿到了。这是一种方法&…

2024 年最新 Python 基于 LangChain 框架基础案例详细教程(更新中)

LangChain 框架搭建 安装 langchain pip install langchain -i https://mirrors.aliyun.com/pypi/simple/安装 langchain-openai pip install langchain-openai -i https://mirrors.aliyun.com/pypi/simple/ChatOpenAI 配置环境变量 环境变量 OPENAI_API_KEYOpenAI API 密钥…

在IDEA 2024.1.3 (Community Edition)中创建Maven项目

本篇博客承继自博客Windows系统Maven下载安装-CSDN博客 Maven版本&#xff1a;maven-3.9.5 修改设置&#xff1a; 首先先对Idea的Maven依赖进行设置&#xff1b;打开Idea&#xff0c;选择“Costomize”&#xff0c;选择最下边的"All settings" 之后找到Maven选项&…