Netty(2)

news2025/1/13 7:23:02

Netty

文章目录

  • Netty
  • 4 Netty 模型
    • 4.1 Netty 模型介绍
    • 4.2 Netty demo
    • 4.3 Netty 异步模型
      • 4.3.1 基本介绍
      • 4.3.2 异步模型
      • 4.3.3 Future-Listener 机制
      • 4.4 Netty 任务队列 task

4 Netty 模型

4.1 Netty 模型介绍

Netty 线程模式:Netty 主要基于主从 Reactor 多线程模型做了一定的改进,其中主从Reactor 多线程模型有多Reactor
在这里插入图片描述
在这里插入图片描述

  1. Netty 抽象出两组线程池 BoosGroup 和 WorkGroup
  2. BoosGroup 专门负责接收客户端连接
  3. WorkGroup 专门负责网络的读写
  4. BoosGroup、WorkGroup 类型都是 NioEventLoopGroup
  5. NioEventLoopGroup 是一个事件循环组,可以是多个线程,组中包含多个事件循环,每个事件循环都是 NioEventLoop
  6. NioEventLoop 表示一个事件循环,不断循环的执行处理任务的线程,每个 NioEventLoop 都有一个 selector,用于监听绑定在其上的 socket 网络通讯
  7. Boos NioEventLoop 执行步骤
    • 轮询 accept 事件
    • 处理 select 事件,与 client 建立连接,生成 NioSocketChannnel,并将其注册到某个 work NioEventLoop 上的 selector
    • runAllTasks 处理任务队列
  8. Work NioEventLoop 执行步骤
    • 轮询 read、write 事件
    • 在对应的 NioSocketChannel 处理 read、write 事件
    • runAllTasks 处理任务队列
  9. 每个 work NioEventLoop 处理业务时,会使用 pipline,pipline 中包含 channel,即可以通过 pipline 获取到对应的 channel,并且 pipline 中也维护了很多处理器

4.2 Netty demo

maven

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
  <version>4.1.20.Final</version>
</dependency>

server

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;

public class NettyServer {

    public static void main(String[] args) {

        EventLoopGroup boosGroup = null;
        EventLoopGroup workGroup = null;

        try {

            // 创建 boosGroup 一直循环只处理连接请求,真正的业务交由 workGroup 处理
            boosGroup = new NioEventLoopGroup();

            // 创建 workGroup 处理 read write 事件
            workGroup = new NioEventLoopGroup();

            ServerBootstrap b = new ServerBootstrap();

            b.group(boosGroup, workGroup) // 配置boosGroup workGroup
                    .channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作为服务器的通道实现
                    .option(ChannelOption.SO_BACKLOG, 64) // 设置线程队列得到连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
                    .handler(new LoggingHandler(LogLevel.INFO)) // handler 在 BoosGroup 中生效
                    .childHandler(new ChannelInitializer<SocketChannel>() { // childHandler 在 WorkGroup 中生效
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            System.out.println("初始化server端channel对象...");

                            ChannelPipeline pipeline = ch.pipeline();

                            pipeline.addLast(new ChannelInboundHandlerAdapter() {
                                /**
                                 * 读取客户端发送的数据
                                 * @param ctx 上下文对象, 含有 管道pipeline , 通道channel, 地址 等
                                 * @param msg 客户端发送的数据 默认Object
                                 * @throws Exception
                                 */
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

                                    System.out.println("server 端正在接收 client 端的数据......");

                                    ByteBuf buffer = (ByteBuf) msg;

                                    System.out.println("client:" + ctx.channel().remoteAddress() + " 发送过来的数据 msg = " + buffer.toString(CharsetUtil.UTF_8));

                                }

                                @Override
                                public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                                    System.out.println("server 数据读取完毕 .....");
                                    ctx.writeAndFlush(Unpooled.copiedBuffer("server 以读取 client 发送的数据...", CharsetUtil.UTF_8));
                                }

                                @Override
                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                    System.out.println("出现异常:" + cause.getMessage());
                                    // 关闭通道
                                    ctx.close();
                                }
                            });

                        }
                    });

            System.out.println("server 端 ready !!!!");

            ChannelFuture channelFuture = b.bind("127.0.0.1", 8090).sync();

            // 给 ChannelFuture 注册监听器
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) System.out.println("server 端监听 8090 端口 成功....");
                    else System.out.println("server 端监听 8090 端口 失败....");
                }
            });

            //关闭通道
            channelFuture.channel().closeFuture().sync();


        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {

            boosGroup.shutdownGracefully();
            workGroup.shutdownGracefully();

        }


    }

}


client

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;

public class NettyClient {

    public static void main(String[] args) {

        // 客户端需要一个事件循环组
        NioEventLoopGroup group = null;

        try {

            group = new NioEventLoopGroup();

            // 创建客户端启动对象
            Bootstrap b = new Bootstrap();

            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ChannelPipeline pipeline = ch.pipeline();

                            pipeline.addLast(new ChannelInboundHandlerAdapter() {

                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    System.out.println("当通道就绪就会触发该方法.....");
                                    ctx.writeAndFlush(Unpooled.copiedBuffer("client 通道已就绪...", CharsetUtil.UTF_8));
                                }

                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

                                    ByteBuf buffer = (ByteBuf) msg;

                                    System.out.println("client 读取 server 发送的数据 msg = " + buffer.toString(CharsetUtil.UTF_8));

                                }

                                @Override
                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                    System.out.println("出现异常,异常信息 cause = " + cause.getMessage());
                                    // 关闭通道
                                    ctx.close();
                                }
                            });

                        }
                    });

            System.out.println("client 端已 ok ...");

            // 启动客户端去连接服务器端
            ChannelFuture channelFuture = b.connect("127.0.0.1", 8090).sync();

            channelFuture.channel().closeFuture().sync();


        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }


    }

}

  1. Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作
  2. NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 selector,用于监听绑定在其上的 socket 网络通道。
  3. NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责
  4. NioEventLoopGroup下包含多个 NioEventLoop
  5. 每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue
  6. 每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel
  7. 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上
  8. 每个 NioChannel 都绑定有一个自己的 ChannelPipeline

4.3 Netty 异步模型

4.3.1 基本介绍

  1. 异步的概念和同步相对
  2. 当一个异步过程调用发出后,调用者不能立刻得到结果,实 际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者
  3. Netty中的I/O操作是异步的,包括Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture
  4. 调用者并不能立刻获得结果,而是通过Future-Listener 机制,用户可以方便的主动获 取或者通过通知机制获得IO操作结果
  5. Netty 的异步模型是建立在 future 和 callback 的之上的
  6. callback 回调
  7. Future 的核心思想是:假设一个方法fun,计算过程可能非常耗时,等待 fun 返回 显然不合适。那么可以在调用 fun 的时候,立马返回一个Future,后续可以通过 Future 去监控方法 fun 的处理过程(即:Future-Listener 机制)

4.3.2 异步模型

在这里插入图片描述

  1. 在使用 Netty 进行编程时,拦截操作和转换出入站数据只需要提供 callback 或利用 future 即可。这使得链式操作简单、高效,并有利于编写可重用的、通用的代码。
  2. Netty框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来、解脱出来

4.3.3 Future-Listener 机制

当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作。

常见有如下操作

isDone判断当前操作是否完成
isSuccess判断已完成的当前操作是否成功
getCause获取已完成的当前操作失败的原因
isCancelled判断已完成的当前操作是否被取消
addListener注册监听器 当操作已完成(isDone方法返回完成),将会通知 指定的监听器;如果Future对象已完成,则通知指定的监听器

相比传统阻塞I/O,执行I/O操作后线程会被阻塞住,直到操作完成
异步处理的好处是不会造成线程阻塞,线程在I/O操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量

4.4 Netty 任务队列 task

处理耗时操作

  • 用户程序自定义普通任务
    将任务提交 taskQueue中 但还是一个线程在执行
  • 定时提交任务
    将任务提交 scheduledTaskQueue 使用不同的线程
childHandler(new ChannelInitializer<SocketChannel>() { // childHandler 在 WorkGroup 中生效
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            System.out.println("初始化server端channel对象...");

                            System.out.println("ChannelInitializer thread name = " + Thread.currentThread().getName());

                            ChannelPipeline pipeline = ch.pipeline();

                            pipeline.addLast(new ChannelInboundHandlerAdapter() {
                                /**
                                 * 读取客户端发送的数据
                                 *
                                 * @param ctx 上下文对象, 含有 管道pipeline , 通道channel, 地址 等
                                 * @param msg 客户端发送的数据 默认Object
                                 * @throws Exception
                                 */
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

                                    System.out.println("server 端正在接收 client 端的数据......");

                                    System.out.println("ChannelInitializer channelRead thread name = " + Thread.currentThread().getName());


                                    System.out.println("ChannelInitializer channelRead 普通handle thread name = " + Thread.currentThread().getName());
                                    ByteBuf buffer0 = (ByteBuf) msg;
                                    System.out.println("client:" + ctx.channel().remoteAddress() + " 普通handle 发送过来的数据 msg = " + buffer0.toString(CharsetUtil.UTF_8));


                                    ctx.channel().eventLoop().execute(() -> {
                                        try {
                                            TimeUnit.SECONDS.sleep(60);
                                        } catch (InterruptedException e) {
                                            e.printStackTrace();
                                        }
                                        System.out.println("ChannelInitializer channelRead taskQueue handle thread name = " + Thread.currentThread().getName());
                                        ByteBuf buffer1 = (ByteBuf) msg;
                                        System.out.println("client:" + ctx.channel().remoteAddress() + " 发送过来的数据 msg = " + buffer1.toString(CharsetUtil.UTF_8));
                                    });


                                    ctx.channel().eventLoop().schedule(() -> {
                                        System.out.println("ChannelInitializer channelRead scheduleQueue handle thread name = " + Thread.currentThread().getName());
                                        ByteBuf buffer2 = (ByteBuf) msg;
                                        System.out.println("client:" + ctx.channel().remoteAddress() + " 发送过来的数据 msg = " + buffer2.toString(CharsetUtil.UTF_8));
                                    }, 60, TimeUnit.SECONDS);

                                    System.out.println("server doing...");

                                }

                                @Override
                                public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                                    System.out.println("server 数据读取完毕 .....");
                                    ctx.writeAndFlush(Unpooled.copiedBuffer("server 以读取 client 发送的数据...", CharsetUtil.UTF_8));
                                }

                                @Override
                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                    System.out.println("出现异常:" + cause.getMessage());
                                    // 关闭通道
                                    ctx.close();
                                }
                            });

                        }
                    })

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/494938.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023年开源社项目委员会介绍

2023 项目委员会成员 项目委员会主席&#xff1a;石垚 &#xff08;tech-querykaiyuanshe.org&#xff09; 项目委员会秘书 &#xff1a;丁文昊 &#xff08;dingwenhaokaiyuanshe.org&#xff09; 开源社官网项目组&#xff1a; 组长&#xff1a;石垚 &#xff08;tech-queryk…

2023 ATTCK v13版本更新指南

一、什么是ATT&CK ATT&CK&#xff08;Adversarial Tactics, Techniques, and Common Knowledge &#xff09;是一个攻击行为知识库和模型&#xff0c;主要应用于评估攻防能力覆盖、APT情报分析、威胁狩猎及攻击模拟等领域。 二、ATT&CK 发展历史 1996年&#xff1…

【UE】高级载具插件-04-坦克瞄准开火

在上一篇文章中&#xff08;【UE】高级载具插件-03-子弹击中目标时使目标破碎&#xff09;&#xff0c;我们实现了坦克开火的功能。本篇博客介绍的是实现坦克瞄准开火的功能。 效果 步骤 1. 首先将学习FPS游戏时用到的动态准心控件蓝图资源导入 2. 在项目设置中增加两个操作…

穿越有序链表的迷宫:探索力扣“合并两个有序链表”的解题思路

本篇博客计划讲解力扣“21. 合并两个有序链表”这道题&#xff0c;这是题目链接。 老规矩&#xff0c;先来审下题干。 输出示例如下&#xff1a; 提示&#xff1a; 这道题目相当经典&#xff0c;同时是校招的常客。大家先思考一下&#xff0c;再来听我讲解。 思路&…

7.1 幂法和反幂法

学习目标&#xff1a; 如果我要学习幂法及反幂法&#xff0c;我会遵循以下步骤&#xff1a; 1. 学习理论知识&#xff1a;首先我会找到可靠的教材或者网上资源&#xff0c;学习幂法及反幂法的理论知识&#xff0c;包括其原理、公式、算法流程、收敛性等方面的内容。这些知识可…

Cadence Allegro BGA类器件扇孔操作教程

对于BGA扇孔&#xff0c;同样过孔不宜打孔在焊盘上&#xff0c;推荐打孔在两个焊盘的中间位置。很多工程师为了出线方便&#xff0c;随意挪动BGA里面过孔的位置&#xff0c;甚至打在焊盘上面&#xff0c;如图1所示&#xff0c;从而造成BGA区域过孔不规则&#xff0c;易造成后期…

3.shell脚本例子

文章目录 1.计算从1到100所有整数的和2.提示用户输入一个小于100的整数&#xff0c;并计算从1到该数之间所有整数的和3.求从1到100所有整数的偶数和、奇数和4.用户名存放在users.txt文件中&#xff0c;每行一个&#xff0c;判断文件里的用户是否存在&#xff0c;若该用户存在&a…

【Java EE】-Servlet(一) 创建Maven下的webapp项目

作者&#xff1a;学Java的冬瓜 博客主页&#xff1a;☀冬瓜的主页&#x1f319; 专栏&#xff1a;【JavaEE】 分享: 在满园弥漫的沉静的光芒之前&#xff0c;一个人更容易看到时间&#xff0c;并看到自己的身影。——史铁生《我与地坛》 主要内容&#xff1a;创建一个基于maven…

【云计算•云原生】5.云原生之初识OpenStack

文章目录 OpenStack起源OpenStack基本组件HorizonNovaSwiftCinderKeystoneNeutronGlanceCeilometerTroveHeat OpenStack简单框架模型 OpenStack起源 OpenStack是一个由NASA和Rackspace合作研发并发起的&#xff0c;以Aapache许可证授权的自由软件和开放源代码项目。为公有云及…

JS 实现区块链分布式网络

JS 实现区块链分布式网络 这里主要 JS 实现区块链 实现的部分继续下去&#xff0c;对 Blockchain 这个对象有一些修改&#xff0c;如果使用 TS 的话可能要修改对应的 interface&#xff0c;但是如果是 JS 的话就无所谓了。 需要安装的依赖有&#xff1a; express body-parse…

字节跳动ByteHouse与亚马逊云科技携手打造新一代云数仓服务

随着全球化的发展&#xff0c;越来越多的中国企业开始涉足海外市场&#xff0c;开展跨境业务。在这个过程中&#xff0c;强大的数据分析能力是出海企业不可或缺的重要一环。通过有效的数据分析&#xff0c;能帮助企业更好地了解全球市场对产品的需求便于调整产品战略&#xff0…

微服务---RabbitMQ进阶(消息可靠性,延迟队列,惰性队列,集群部署)

RabbitMQ进阶(消息可靠性,延迟队列,惰性队列,集群部署) 消息队列在使用过程中&#xff0c;面临着很多实际问题需要思考&#xff1a; 1.消息可靠性 消息从发送&#xff0c;到消费者接收&#xff0c;会经理多个过程&#xff1a; 其中的每一步都可能导致消息丢失&#xff0c;常见…

Python每日一练(20230506) 存在重复元素I、II、III

目录 1. 存在重复元素 Contains Duplicate I 2. 存在重复元素 Contains Duplicate II 3. 存在重复元素 Contains Duplicate III &#x1f31f; 每日一练刷题专栏 &#x1f31f; Golang每日一练 专栏 Python每日一练 专栏 C/C每日一练 专栏 Java每日一练 专栏 1. 存在重…

项目管理-数据管理能力DCMM模型

DCMM 数据管理能力成熟度评估模型简介 DCMM&#xff08;Data Management Capability Maturity Assessment Model&#xff0c;数据管理能力成熟度评估模型&#xff09;是我国首个数据管理领域国家标准&#xff0c;将组织内部数据能力划分为八个重要组成部分&#xff0c;描述了每…

重新定义座舱智能化的下半场?谁能抓住弯道超车的窗口期

2020年&#xff0c;高通8155上车之前&#xff0c;行业的定义更多是4G联网互联网生态&#xff08;智能手机的复刻&#xff09;&#xff0c;以斑马智行为代表&#xff1b;而随着集成异构计算、高性能AI引擎&#xff08;8TOPS算力&#xff09;的高通8155密集上车&#xff0c;驱动行…

车载多屏互动联动动画版本同屏幕大小情况方案设计--众筹项目

hi&#xff0c;粉丝朋友们&#xff1a; 背景及成果展示 本节带大家来开始学习多屏幕互动的动画版本设计&#xff0c;回忆一下我们已经在之前blog和wms课程中学习了多屏互动的非动画版本如下&#xff1a; 再来看看今天我们想要实现有动画版本的成果&#xff1a; 是不是相比之…

多维时序 | MATLAB实现基于VMD-SSA-LSSVM、SSA-LSSVM、VMD-LSSVM、LSSVM的多变量时间序列预测对比

多维时序 | MATLAB实现基于VMD-SSA-LSSVM、SSA-LSSVM、VMD-LSSVM、LSSVM的多变量时间序列预测对比 目录 多维时序 | MATLAB实现基于VMD-SSA-LSSVM、SSA-LSSVM、VMD-LSSVM、LSSVM的多变量时间序列预测对比预测效果基本介绍程序设计学习总结参考资料 预测效果 基本介绍 多维时序 …

全景环视搭载率突破30%,本土供应商在细分市场突围而出

随着行泊一体、AVP等功能成为智能驾驶赛道新周期的主角&#xff0c;背后支撑落地的全景环视&#xff08;也称为360环视&#xff09;方案也不再是传统功能定义场景&#xff08;为驾驶员提供泊车及盲区辅助&#xff09;下的应用&#xff0c;同时&#xff0c;环视与周视的硬件复用…

【Mybatis-Plus笔记01】整合Springboot实现基础配置和增删改查案例

【Mybatis-Plus笔记01】整合Springboot实现基础配置和增删改查案例 【一】Mybatis-Plus的简单介绍【1】MP的特特性有哪些【2】MP的框架结构 【二】MP的使用案例&#xff08;1&#xff09;准备开发环境&#xff08;2&#xff09;添加pom依赖&#xff08;3&#xff09;编写yml配置…

基于SpringBoot+Vue实现的体检录入系统

【简介】 本体检信息录入系统采用前端&#xff1a;vue&#xff1b;后端&#xff1a;springbootmybatis-plusredismysql技术架构开发&#xff0c;前后端分离&#xff0c;容易上手。除了基本的体检结果查询、录入及导出外&#xff0c;在录入中还能对录入信息进行智能计算。 【功…