Netty学习(Netty入门)

news2025/1/13 9:30:20

概述

Netty是什么

在这里插入图片描述

Netty的地位

在这里插入图片描述

Netty的优势

在这里插入图片描述

HelloWorld

public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
        // 1. 启动类
        new Bootstrap()
            // 2. 添加 EventLoop
            .group(new NioEventLoopGroup())
            // 3. 选择客户端 channel 实现
            .channel(NioSocketChannel.class)
            // 4. 添加处理器
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override // 在连接建立后被调用
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new StringEncoder());
                }
            })
            // 5. 连接到服务器
            .connect(new InetSocketAddress("localhost", 8080))
            .sync()
            .channel()
            // 6. 向服务器发送数据
            .writeAndFlush("hello, world");
    }
}


public class HelloServer {
    public static void main(String[] args) {
        // 1. 启动器,负责组装 netty 组件,启动服务器
        new ServerBootstrap()
            // 2. BossEventLoop, WorkerEventLoop(selector,thread), group 组
            .group(new NioEventLoopGroup())
            // 3. 选择 服务器的 ServerSocketChannel 实现
            .channel(NioServerSocketChannel.class) // OIO BIO
            // 4. boss 负责处理连接 worker(child) 负责处理读写,决定了 worker(child) 能执行哪些操作(handler)
            .childHandler(
                    // 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler
                new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    // 6. 添加具体 handler
                    ch.pipeline().addLast(new LoggingHandler());
                    ch.pipeline().addLast(new StringDecoder()); // 将 ByteBuf 转换为字符串
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 自定义 handler
                        @Override // 读事件
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            System.out.println(msg); // 打印上一步转换好的字符串
                        }
                    });
                }
            })
            // 7. 绑定监听端口
            .bind(8080);
    }
}

流程分析

在这里插入图片描述

正确理解

正确理解 Netty中各个组件的功能和职责
在这里插入图片描述

组件

EventLoop

在这里插入图片描述

普通任务和定时任务


@Slf4j
public class TestEventLoop {
    public static void main(String[] args) {
        // 1. 创建事件循环组
        EventLoopGroup group = new NioEventLoopGroup(2); // io 事件,普通任务,定时任务
//        EventLoopGroup group = new DefaultEventLoopGroup(); // 普通任务,定时任务
        // 2. 获取下一个事件循环对象
        System.out.println(group.next());
        System.out.println(group.next());
        System.out.println(group.next());
        System.out.println(group.next());

        // 3. 执行普通任务
        group.next().execute(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("ok");
        });

        // 4. 执行定时任务
        group.next().scheduleAtFixedRate(() -> {
            log.debug("ojbk");
        }, 0, 1, TimeUnit.SECONDS);

        log.debug("main");
    }
}

IO任务

Netty客户端是多线程程序,idea debug 默认断点模式为ALL,即会停止主线程以及守护线程,所以当客户端断点自定义Evaluate发送数据时,守护线程的发送数据Channel也被断点停止,所以无法发送数据

选择Thread只停止当前线程,守护线程仍然可以运行
在这里插入图片描述

一个客户端的NIO线程跟Channel建立链接就会建立一个绑定关系,后续客户端的Channel上的IO事件都由一个EventLoop处理,
客户端-Channel-EventLoop绑定关系

在这里插入图片描述

EventLoop的分工细化

第一次细分,Netty建议将EventLoop职责细分,分为boss和worker
group中传入两个EventLoop,那么boss只负责accept事件,worker负责read事件

在这里插入图片描述

上诉优化,worker中的NIOEventLoopGroup除了要负责SocketChannel的NIO连接操作还要负责连接后的读写操作,如果读写较长较重,那么会阻塞影响到worker其他的连接或读写操作,所以,
再次细分,EventLoop有两种实现,NIOEventLoopGroup 能处理IO事件普通任务和定时任务,DefaultEventLoopGroup只能处理普通任务和定时任务,将读写操作交给它去处理耗时较长的读写操作。

在这里插入图片描述

作为对比,第一个没有指定group,默认使用了worker的NIOEventLoopGroup来处理读写操作,而第二则使用了DefaultEventLoop来处理读写操作

切换线程

在这里插入图片描述

Channel

在这里插入图片描述

正确的链接建立:ChannelFuture

处理异步连接

由于连接的建立是耗时的,所以Channel必须等到连接建立完成再执行获取,否则是无效的

在这里插入图片描述
connect方法返回的ChannelFuture若没有阻塞等待连接,那么接下来获取到的Channel是没有建立好连接的Channel

在这里插入图片描述
如上两种方法异步等待NIO线程建立完毕

谁发起的调用谁等待链接结果

正确的链接关闭:CloseFuture的关闭

在这里插入图片描述

不能直接在主线程或其他线程中直接处理关闭操作,因为nioEventLoopGroup-2-1属于异步线程,此处close方法非阻塞,有可能在关闭操作还未完成就执行了关闭后操作

解决方法:
使用阻塞关闭方法,只有当channel真的关闭了才执行后面的方法

在这里插入图片描述
优雅的关闭:等待还未执行完的操作执行完后再关闭
在这里插入图片描述

为什么Netty是异步设计

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

Future & Promise

概述

在这里插入图片描述
在这里插入图片描述

Future

jdk中的Future

Future就是在线程之间传递结果的一个容器,是被动的获取结果,由执行完任务的线程给予的结果,没有暴露主动赋予结果的方法


@Slf4j
public class TestJdkFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.创建线程池
        ExecutorService service = Executors.newFixedThreadPool(2);
        //2.提交任务
        Future<Object> future = service.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                log.debug("执行计算");
                Thread.sleep(1000);
                return 50;
            }
        });
        //3.祝线程通过future获取结果,get是阻塞等待方法
        log.debug("等待结果");
        log.debug("结果{}",future.get());

    }
}

Netty 中的 Future

与jdk中的差不多,继承至jdk的Future,做了增强


@Slf4j
public class TestNettyFuture {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();
        //提交任务
        Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.debug("执行计算");
                System.out.println("执行计算");
                Thread.sleep(1000);
                return 50;
            }
        });
        //通过future获取结果,get是阻塞等待方法
        log.debug("等待结果");
        log.debug("结果{}",future.get());

        //异步方式获取结果
        future.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception {
                //getNow非阻塞等待 立即获取结果
                log.debug("结果{}",future.getNow());
            }
        });

    }
}

Promise

Promise又继承至Netty的Future,功能更强大,可以主动填充结果,对于网络通信非常有用

@Slf4j
public class TestNettyPromise {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 准备 EventLoop 对象
        EventLoop eventLoop = new NioEventLoopGroup().next();
        // 2. 可以主动创建 promise, 结果容器
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
        new Thread(() -> {
            // 3. 任意一个线程执行计算,计算完毕后向 promise 填充结果
            log.debug("开始计算...");
            try {
                int i = 1 / 0;
                Thread.sleep(1000);
                promise.setSuccess(80);
            } catch (Exception e) {
                e.printStackTrace();
                promise.setFailure(e);
            }

        }).start();
        // 4. 接收结果的线程
        log.debug("等待结果...");
        log.debug("结果是: {}", promise.get());
    }

}

Handler & Pipeline

Pipeline

在这里插入图片描述

Inbound

在这里插入图片描述
入栈是按入栈顺序出,出栈是按入栈顺序返出

channelRead 是一个调用链,如果中间没调用,那么后面的handler 则调用不到

Outbound

注意ctx.writeAndFlushch.writeAndFlush
ctx.writeAndFlush 是从当前调用的 handler 往后寻找 OutboundHandler,若之前没有执行到OutboundHandler 那么找不到OutboundHandler执行

ch.writeAndFlush则是从整个调用链的最前端 tail 处理开始往后寻找OutboundHandler
而且先执行调用链中的InboundHandler输入,中间的 OutboundHandler 被跳过不影响正常输入执行
在这里插入图片描述
如图ch.writeAndFlush 的调用执行流程
在这里插入图片描述

ByteBuffer

nettyByteBuf容量动态扩容,nettyByteBuffer 固定容量
在这里插入图片描述
netty 中 ByteBuffer 默认使用直接内存(系统内存、内存条)
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

例如 扩容 2 的整数倍 2^9=512 扩容至 2^10 =1024

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
tail 只能处理原始 ByteBuf 如果中途 ByteBuf 被转换成其他数据类型,则 tail 无法自动release

零拷贝 slice

slice 是 netty 中对于零拷贝的体现之一
在这里插入图片描述
在这里插入图片描述

切片后生成的对象,实际上还是操作原始bytebuf 的内容
在这里插入图片描述

使用习惯,切片自己增加引用计数,避免被其他调用者释放
在这里插入图片描述
在这里插入图片描述

component 组合零拷贝

writeBytes 会发生真正的数据复制,每次writeBytes都会发生数据复制
addComponents 是使逻辑上连续,没有发生复制

在这里插入图片描述

在这里插入图片描述

双向通信

实现一个 echo server

编写 server

new ServerBootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    ByteBuf buffer = (ByteBuf) msg;
                    System.out.println(buffer.toString(Charset.defaultCharset()));

                    // 建议使用 ctx.alloc() 创建 ByteBuf
                    ByteBuf response = ctx.alloc().buffer();
                    response.writeBytes(buffer);
                    ctx.writeAndFlush(response);

                    // 思考:需要释放 buffer 吗
                    // 思考:需要释放 response 吗
                }
            });
        }
    }).bind(8080);

编写 client

NioEventLoopGroup group = new NioEventLoopGroup();
Channel channel = new Bootstrap()
    .group(group)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) throws Exception {
            ch.pipeline().addLast(new StringEncoder());
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    ByteBuf buffer = (ByteBuf) msg;
                    System.out.println(buffer.toString(Charset.defaultCharset()));

                    // 思考:需要释放 buffer 吗
                }
            });
        }
    }).connect("127.0.0.1", 8080).sync().channel();

channel.closeFuture().addListener(future -> {
    group.shutdownGracefully();
});

new Thread(() -> {
    Scanner scanner = new Scanner(System.in);
    while (true) {
        String line = scanner.nextLine();
        if ("q".equals(line)) {
            channel.close();
            break;
        }
        channel.writeAndFlush(line);
    }
}).start();

💡 读和写的误解

我最初在认识上有这样的误区,认为只有在 netty,nio 这样的多路复用 IO 模型时,读写才不会相互阻塞,才可以实现高效的双向通信,但实际上,Java Socket 是全双工的:在任意时刻,线路上存在A 到 BB 到 A 的双向信号传输。即使是阻塞 IO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读

例如

public class TestServer {
    public static void main(String[] args) throws IOException {
        ServerSocket ss = new ServerSocket(8888);
        Socket s = ss.accept();

        new Thread(() -> {
            try {
                BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
                while (true) {
                    System.out.println(reader.readLine());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
                // 例如在这个位置加入 thread 级别断点,可以发现即使不写入数据,也不妨碍前面线程读取客户端数据
                for (int i = 0; i < 100; i++) {
                    writer.write(String.valueOf(i));
                    writer.newLine();
                    writer.flush();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

客户端

public class TestClient {
    public static void main(String[] args) throws IOException {
        Socket s = new Socket("localhost", 8888);

        new Thread(() -> {
            try {
                BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
                while (true) {
                    System.out.println(reader.readLine());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
                for (int i = 0; i < 100; i++) {
                    writer.write(String.valueOf(i));
                    writer.newLine();
                    writer.flush();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1897340.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

lua入门(1) - 基本语法

本文参考自&#xff1a; Lua 基本语法 | 菜鸟教程 (runoob.com) 需要更加详细了解的还请参看lua 上方链接 交互式编程 Lua 提供了交互式编程模式。我们可以在命令行中输入程序并立即查看效果。 Lua 交互式编程模式可以通过命令 lua -i 或 lua 来启用&#xff1a; 如下图: 按…

【车载开发系列】J-Link/JFlash 简介与驱动安装方法

【车载开发系列】J-Link/JFlash 简介与驱动安装方法 【车载开发系列】J-Link/JFlash 简介与驱动安装方法 【车载开发系列】J-Link/JFlash 简介与驱动安装方法一. 软件介绍二. 下载安装包二. 开始安装三. 确认安装四. J-Flash的使用 一. 软件介绍 J-Link是SEGGER公司为支持仿真…

springboot校园安全通事件报告小程序-计算机毕业设计源码02445

Springboot 校园安全通事件报告小程序系统 摘 要 随着中国经济的飞速增长&#xff0c;消费者的智能化水平不断提高&#xff0c;许多智能手机和相关的软件正在得到更多的关注和支持。其中&#xff0c;校园安全通事件报告小程序系统更是深得消费者的喜爱&#xff0c;它的出现极大…

Debug-017-elementUI-el-cascader组件首次选择选项不触发表单的自定义校验

前情提要&#xff1a; 今天维护一个表单校验的时候发现一件事情&#xff0c;就是在表单中使用了 el-cascader组件&#xff0c;希望根据接口返回数据去动态校验一下这里面的选项&#xff0c;符合逻辑就通过自定义的表单校验&#xff0c;不符合就在这一项的下面标红提示。做的时候…

Mean teacher are better role models-论文笔记

论文笔记 资料 1.代码地址 2.论文地址 https://arxiv.org/pdf/1703.01780 3.数据集地址 CIFAR-10 https://www.cs.utoronto.ca/~kriz/cifar.html 论文摘要的翻译 最近提出的Temporal Ensembling方法在几个半监督学习基准中取得了最先进的结果。它维护每个训练样本的标签…

C语言图书馆管理系统(管理员版)

案例&#xff1a;图书馆管理系统&#xff08;管理员版&#xff09; 背景&#xff1a; 随着信息技术的发展和普及&#xff0c;传统的图书馆管理方式已经无法满足现代图书馆高效、便捷、智能化的管理需求。传统的手工登记、纸质档案管理不仅耗时耗力&#xff0c;而且容易出现错…

RT-Thread Studio与CubeMX联合编程之rtthread启动

看到了好多文章&#xff0c;在rtthread studio中启用mx&#xff0c;后总是复制mx相关msp函数到rt的board.c文件下&#xff0c;实际使用过程中发现并不需要&#xff0c;这里我们看下rt启动流程&#xff0c;看下到底需要不。 1.打开startup_stm32h743xx.S文件&#xff0c;看下芯片…

区块链技术如何改变供应链管理?

引言 供应链管理在现代商业中扮演着至关重要的角色&#xff0c;确保产品和服务从原材料到最终消费者的顺利流转。然而&#xff0c;当前的供应链管理面临诸多挑战&#xff0c;如信息不透明、数据篡改和效率低下等问题&#xff0c;这些问题严重制约了供应链的整体效能和可信度&am…

用kimi和claude自动生成时间轴图表

做时间轴图表并不难&#xff0c;但是很麻烦&#xff0c;先要大量收集相关事件&#xff0c;然后在一些图表软件中反复调整操作。现在借助AI工具&#xff0c;可以自动生成了。 首先&#xff0c;在kimi中输入提示词来获取某个企业的大事记&#xff1a; 联网检索&#xff0c;元语…

【专业指南】移动硬盘坏道下的数据恢复之道

移动硬盘坏道揭秘&#xff1a;数据安全的隐形挑战 在数据日益成为核心资产的今天&#xff0c;移动硬盘作为便携存储的代名词&#xff0c;承载着无数用户的重要信息。然而&#xff0c;随着使用时间的增长和不当操作的影响&#xff0c;移动硬盘可能会遭遇“坏道”这一棘手问题。…

14-30 剑和诗人4 – 具有长上下文窗口的微调 LLM 的数据设计

LLM 中的长上下文窗口的挑战 微调大型语言模型 (LLM) 面临的最大挑战之一在于处理较长的上下文窗口。LLM 经过大量文本数据训练&#xff0c;能够理解和生成类似人类的语言。然而&#xff0c;在推理过程中&#xff0c;这些模型的上下文窗口有限&#xff0c;通常约为 2,048 个标…

基于MCU平台的HMI开发的性能优化与实战(下)

继上篇《基于MCU平台的HMI开发的性能优化与实战&#xff08;上&#xff09;》深入探讨了提升MCU平台HMI开发效率和应用性能的策略后&#xff0c;本文将专注于NXP i.MX RT1170 MCU平台的仪表盘开发实践。我们将重点介绍Qt for MCUs的优化技巧&#xff0c;展示如何通过实际案例应…

三、分布式软总线的架构设计

软总线的主要架构如下&#xff1a; 软总线主体功能分为发现、组网、连接和传输四个基本模块&#xff0c;实现&#xff1a; 即插即用&#xff1a;快速便捷发现周边设备。 自由流转&#xff1a;各设备间自组网&#xff0c;任意建立业务连接&#xff0c;实现自由通信。 高效传…

Ubuntu 22.04远程自动登录桌面环境

如果需要远程自动登录桌面环境&#xff0c;首先需要将Ubuntu的自动登录打开&#xff0c;在【settings】-【user】下面 然后要设置【Sharing】进行桌面共享&#xff0c;Ubuntu有自带的桌面共享功能&#xff0c;不需要另外去安装xrdp或者vnc之类的工具了 点开【Remote Desktop】…

Zerotier+Parsec五分钟实现外网远程访问校园或公司内网

0 需求 校园网或公司内网是不能直接通过远程控制桌面软件访问的&#xff0c;想要实现&#xff0c;就必须通过三方的服务来实现穿透内网。但是这样的缺点就是存在延迟。 1 安装软件 &#xff08;1&#xff09;Zerotier 是内网穿透软件&#xff0c;在两台设备上都要安装&#…

LabVIEW图像分段线性映射

介绍了如何使用LabVIEW对图像进行分段线性映射处理&#xff0c;通过对特定灰度值区间进行不同的线性映射调整&#xff0c;以优化图像的显示效果。案例中详细展示了如何配置和使用LabVIEW中的图像处理工具&#xff0c;包括设置分段区间、计算映射参数和应用映射函数等步骤。 实…

我独立开发生涯的第一个商业化产品 - 微寻

2024 年 04 月 27 日晚八点&#xff0c;微寻 终于正式上线了。时隔一周&#xff0c;我在五一假期的最后一天写下此文&#xff0c;以纪念这款我独立开发生涯的第一个商业化产品。 1. 何为微寻 微寻 为个人网站提供微信码登录 能力。 没错&#xff0c;微寻 是一个小型 SaaS&am…

【数据结构】07.循环队列

一、循环队列的定义 定义&#xff1a;队列主要有顺序队列&#xff0c;循环队列&#xff0c;双端队列&#xff0c;优先队列。而当中循环队列是一种线性数据结构。它也被称为“环形缓冲器”。它只允许在一端进行插入操作&#xff0c;即队尾&#xff08;rear&#xff09;&#xf…

【elasticsearch】IK分词器添加自定义词库,然后更新现有的索引

进入elasticsearch中的plugins位置&#xff0c;找到ik分词器插件&#xff0c;进入ik插件的config文件夹&#xff0c;当中有一个IKAnalyzer.cfg.xml配置文件。使用vim编辑器修改配置文件&#xff1a; vim IKAnalyzer.cfg.xml 配置文件如下&#xff08;添加了自定义字典的位置&…

springboot交流论坛网站-计算机毕业设计源码00304

摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了交流论坛网站的开发全过程。通过分析交流论坛网站管理的不足&#xff0c;创建了一个计算机管理交流论坛网站的方案。文章介绍了交流论坛网站的系统分析部分&…