Netty初探:掌握高性能网络通信框架,提升Java网络编程技能

news2025/1/11 18:39:39

Netty初探

NIO 的类库和 API 繁杂 , 使用麻烦: 需要熟练掌握Selector ServerSocketChannelSocketChannel ByteBuffer等。

开发工作量和难度都非常大: 例如客户端面临断线重连、 网络闪断、心跳处理、半包读写、 网络拥塞和异常流的处理等等。

Netty 对 JDK 自带的 NIO 的 API 进行了良好的封装 ,解决了上述问题。且Netty拥有高性能、 吞吐量更高 ,延迟更  ,减少资源消耗 ,最小化不必要的内存复制等优点。

Netty 现在都在用的是4.x  5.x版本已经废弃  Netty 4.x 需要JDK 6以上版本支持

Netty的使用场景:

1)互联网行业:在分布式系统中 ,各个节点之间需要远程服务调用 ,高性能的 RPC 框架必不可少  Netty 作为异步 高性能的通信框架 ,往往作为基础通信组件被这些 RPC 框架使用。典型的应用有 :阿里分布式服务框架 Dubbo 的   RPC 框架使用 Dubbo 协议进行节点间通信  Dubbo 协议默认使用 Netty 为基础通信组件 ,用于实现。各进程节 点之间的内部通信。 Rocketmq底层也是用的Netty作为基础通信组件。

2)游戏行业:无论是手游服务端还是大型的网络游戏 Java 语言得到了越来越广泛的应用。 Netty 作为高性能的基 础通信组件 ,它本身提供了 TCP/UDP  HTTP 协议栈。

3)大数据领域 :经典的 Hadoop 的高性能通信和序列化组件 Avro  RPC 框架 ,默认采用 Netty 进行跨界点通  ,它的 Netty Service 基于 Netty 框架二次封装实现。

netty相关开源项目 Netty.docs: Related projects

Netty通讯示例

Nettymaven依赖:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty ‐all</artifactId>
    <version>4.1.35.Final</version>
</dependency>

服务端代码:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {

    public static void main(String[] args) throws Exception {

        // 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
        // bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // 创建服务器端的启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();

            // 使用链式编程来配置参数
            bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                    .channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作为服务器的通道实现
                    // 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
                    // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {//对workerGroup的SocketChannel设置处理器
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            System.out.println("netty server start。。");

            // 绑定一个端口并且同步, 生成了一个Channel Future异步对象,通过isDone()等方法可以判断异步事件的执行情况
            // 启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
            ChannelFuture cf = bootstrap.bind(9000).sync();

            // 对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
            // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发来的消息:" + buf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, 客户端~", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客户端代码:

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;

public class NettyClient {
    public static void main(String[] args) throws Exception {
        // 客户端需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            // 创建客户端启动对象
            // 注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();

            // 设置相关参数
            bootstrap.group(group) // 设置线程组
                    .channel(NioSocketChannel.class) // 使用 NioSocketChannel 作为客户端的通道实现
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            // 加入处理器
                            channel.pipeline().addLast(new NettyClientHandler());
                        }
                    });

            System.out.println("netty client start");

            // 启动客户端去连接服务器端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();

            // 对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

        

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当客户端连接服务器完成就会触发该方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8);
        ctx.writeAndFlush(buf);
    }

    // 当通道有读取事件时会触发,即服务端发送数据给客户端
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("收到服务端的消息 :" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务端的地址: " + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

        看完代码 ,我们发现Netty框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来 ,让你可以专注业务的开 发 ,而不需写一大堆类似NIO的网络处理操作。

Netty线程模型

可以先理解下《Scalable IO in Java》这篇文章里说的一些IO处理模式  Netty的线程模型如下图所示:

Netty模块组件

 BootstrapServerBootstrap】:

Bootstrap 意思是引导 ,一个 Netty 应用通常由一个 Bootstrap 开始 ,主要作用是配置整个 Netty 程序  串联各个组   Netty  Bootstrap 类是客户端程序的启动引导类 ServerBootstrap 是服务端启动引导类。

 FutureChannel Future】:

正如前面介绍 ,在 Netty 中所有的 IO 操作都是异步的 ,不能立刻得知消息是否被正确处理。

但是可以过一会等它执行完成或者直接注册一个监听 ,具体的实现就是通过 Future 和Channel Futures ,他们可以注 册一个监听 ,当操作执行成功或失败时监听会自动触发注册的监听事件。

Channel】:

Netty 网络通信的组件 ,能够用于执行网络 I/O 操作。Channel 为用户提供:

1)当前网络连接的通道的状态(例如是否打开?是否已连接?)

2) 网络连接的配置参数 (例如接收缓冲区大小)

3)提供异步的网络 I/O 操作(如建立连接 ,读写 ,绑定端口) ,异步调用意味着任何 I/O 调用都将立即返回 ,并且不保 证在调用结束时所请求的 I/O 操作已完成。

4)调用立即返回一个 Channel Future 实例 ,通过注册监听器到 Channel Future 上 ,可以 I/O 操作成功、失败或取 消时回调通知调用方。

5)支持关联 I/O 操作与对应的处理程序。

不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应。

下面是一些常用的 Channel 类型:

1  NioSocketChannel,异步的客户端 TCP Socket 连接。
2  NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
3  NioDatagramChannel,异步的 UDP 连接。
4  NioSctpChannel,异步的客户端 Sctp 连接。
5  NioSctpServerChannel,异步的 Sctp 服务器端连接。
6  这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。

Selector】:

Netty 基于 Selector 对象实现 I/O 多路复用 ,通Selector 一个线程可以监听多个连接的 Channel 事件。

当向一个 Selector 中注册 Channel 后 Selector 内部的机制就可以自动不断地查询(Select) 这些注册的 Channel 是 否有已就绪的 I/O 事件(例如可读 ,可写  网络连接完成等 这样程序就可以很简单地使用一个线程高效地管理多   Channel 。

 NioEventLoop】:

NioEventLoop 中维护了一个线程和任务队列 ,支持异步提交执行任务 ,线程启动时会调用 NioEventLoop 的 run 方  ,执行 I/O 任务和非 I/O 任务:

I/O 任务 ,即 selectionKey  ready 的事件 ,如 acceptconnect readwrite 等   processSelected Keys 方 法触发。

 IO 任务 ,添加到 taskQueue 中的任务 ,如 register0、 bind0 等任务   runAllTasks 方法触发。

 NioEventLoopGroup】:

NioEventLoopGroup ,主要管理 eventLoop 的生命周期 ,可以理解为一个线程池  内部维护了一组线程 ,每个线程 (NioEventLoop)负责处理多个 Channel 上的事件 ,而一个 Channel 只对应于一个线程。

Channel Handler】:

Channel Handler 是一个接口 ,处理 I/O 事件或拦截 I/O 操作 ,并将其转发到其Channel Pipeline(业务处理链)中的 下一个处理程序。

Channel Handler 本身并没有提供很多方法  因为这个接口有许多的方法需要实现 ,方便使用期间 ,可以继承它的子 类:

1  ChannelInboundHandler 用于处理入站 I/O 事件。 
2  ChannelOutboundHandler 用于处理出站 I/O 操作。

或者使用以下适配器类:

1 ChannelInboundHandlerAdapter  用于处理入站 I/ 0 事件。
2  ChannelOutboundHandlerAdapter  用于处理出站 I/0   操作。

ChannelHandlerContext:

保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。

ChannelPipline:

保存 ChannelHandler  List,  用于处理或拦截 Channel  的入站事件和出站操作。

ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各 个的 ChannelHandler  如何相互交互。

        一个 Channel 包含了一个 ChannelPipeline,    ChannelPipeline  中又维护了一个由 ChannelHandlerContext    组成的双向链表,并且每个 ChannelHandlerContext       中又关联着一个 ChannelHandler

        read事件(入站事件)和write 事件(出站事件)在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler, 出站事件会从链表 tail 往前传递到最前一个出站的 handler, 两种类型的 handler 互不干扰。

ByteBuf详解

        从结构上来说, ByteBuf   由一串字节数组构成。数组中每个字节用来存放信息。

        ByteBuf 提供了两个索引, 一个用于读取数据, 一个用于写入数据。这两个索引通过在字节数组中移动,来定位需要读或者写信息的位置。

        当从 ByteBuf  读取时,它的 readerlndex    (读索引)将会根据读取的字节数递增。

同样,当写 ByteBuf  时,它的 writerlndex    也会根据写入的字节数进行递增。

需要注意的是极限的情况是 readerlndex 刚好读到了 writerlndex 写入的地方。

如果 readerlndex    超过了 writerlndex       Netty   会抛出 IndexOutOf-BoundsException 异常。

示例代码:

public class NettyByteBuf {
    public static void main(String[] args) {
        // 创建byteBuf对象,该对象内部包含一个字节数组byte[10]
        // 通过readerindex和writerIndex和capacity, 将buffer分成三个区域
        // 已经读取的区域:(0,readerindex)
        // 可读取的区域: (readerindex,writerIndex)
        // 可写的区域: (writerIndex,capacity)
        ByteBuf byteBuf = Unpooled.buffer(10);
        System.out.println("byteBuf=" + byteBuf);

        for (int i = 0; i < 8; i++) {
            byteBuf.writeByte(i);
        }
        System.out.println("byteBuf=" + byteBuf);

        for (int i = 0; i < 5; i++) {
            System.out.println(byteBuf.getByte(i));
        }
        System.out.println("byteBuf=" + byteBuf);

        for (int i = 0; i < 5; i++) {
            System.out.println(byteBuf.readByte());
        }
        System.out.println("byteBuf=" + byteBuf);

        // 用Unpooled工具类创建ByteBuf
        ByteBuf byteBuf2 = Unpooled.copiedBuffer("hello,笑傲!", CharsetUtil.UTF_8);

        // 使用相关的方法
        if (byteBuf2.hasArray()) {
            byte[] content = byteBuf2.array();
            // 将 content 转成字符串
            System.out.println(new String(content, CharsetUtil.UTF_8));
            System.out.println("byteBuf=" + byteBuf2);

            System.out.println(byteBuf2.readerIndex()); // 0
            System.out.println(byteBuf2.writerIndex()); // 12
            System.out.println(byteBuf2.capacity()); // 36

            System.out.println(byteBuf2.getByte(0)); // 获取数组0这个位置的字符h的ascii码,h=104

            int len = byteBuf2.readableBytes(); // 可读的字节数 12
            System.out.println("len=" + len);

            // 使用for取出各个字节
            for (int i = 0; i < len; i++) {
                System.out.println((char) byteBuf2.getByte(i));
            }

            // 范围读取
            System.out.println(byteBuf2.getCharSequence(0, 6, CharsetUtil.UTF_8));
            System.out.println(byteBuf2.getCharSequence(6, 6, CharsetUtil.UTF_8));
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1359154.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RKE安装k8s及部署高可用rancher之证书在外面的7层LB(nginx中) 7层负载均衡

一 了解 Rancher 1 推荐架构 安装 Rancher 的方式有两种&#xff1a;单节点安装和高可用集群安装。因为单节点安装只适用于测试和 demo 环境&#xff0c;而且单节点安装和高可用集群安装之间不能进行数据迁移&#xff0c;所以推荐从一开始就使用高可用集群安装的方式安装 Ran…

业务项目中Echarts图表组件的封装实践方案

背景&#xff1a;如果我们的项目是一个可视化类/营销看板类/大屏展示类业务项目&#xff0c;不可避免的会使用到各种图表展示。那在一个项目中如何封装一个图表组件既能够快速复用、UI统一&#xff0c;又可以灵活扩充Echarts的各种复杂配置项配置就变得极为重要。 封装目标 符…

HttpRunner自动化测试之实现参数化传递

参数化实现及重复执行 参数化测试&#xff1a;在接口测试中&#xff0c;为了实现不同组数据对同一个功能模块进行测试&#xff0c;需要准备多组测试数据对模块进行测试的过程。 在httprunner中可以通过如下方式实现参数化&#xff1a; 1、在YAML/JSON 中直接指定参数列表 2、…

C之BS开发

一、 BS 概述与 boa 搭建 1.1 BS 模式开发概述 BS 模式&#xff1a; 浏览器与服务器模式&#xff0c; 即通过浏览器访问服务器的 Web 资源。 1.1.1 web 前端开发技术 主要包含&#xff1a; HTML 、 CSS 、 XML/JSON 、 Javascript 、 AJAX HTML 超文本标记语言 ( 英文全称…

【Element】el-form和el-table嵌套实现表格编辑并提交表单校验

一、背景 页面需要用到表格采集用户数据&#xff0c;提交时进行表单校验&#xff1b;即表格中嵌套着表单&#xff0c;保存时进行表单校验 二、功能实现 2.1、el-form和el-table嵌套说明 ① :model"formData" 给表单绑定数据&#xff0c;formData是表单的数据对象 …

【docker】网络模式管理

目录 一、Docker网络实现原理 二、Docker的网络模式 1、host模式 1.1 host模式原理 1.2 host模式实操 2、Container模式 2.2 container模式实操 3、none模式 4、bridger模式 4.1 bridge模式的原理 4.2 bridge实操 5、overlay模式 6、自定义网络模式 6.1 为什么需要…

Ubuntu20 编译 Android 12源码

1.安装基础库 推荐使用 Ubuntu 20.04 及以上版本编译&#xff0c;会少不少麻烦&#xff0c;以下是我的虚拟机配置 执行命令安装依赖库 // 第一步执行 update sudo apt-get update//安装相关依赖sudo apt-get install -y libx11-dev:i386 libreadline6-dev:i386 libgl1-mesa-de…

【c++】入门3

引用 1.swap交换两个变量值的时候可以用引用 2.例题中通过前序遍历数组构建二叉树&#xff0c;可以用引用传别名. #include <stdio.h> #include <stdlib.h> typedef struct BinaryTreeNode {char data;struct BinaryTreeNode* left;struct BinaryTreeNode* right; …

数据库设计——DML

D M L \huge{DML} DML DML&#xff1a;数据库操作语言&#xff0c;用来对数据库中的数据进行增删改查。 增&#xff08;INSERT&#xff09; 使用insert来向数据库中增加数据。 示例&#xff1a; -- DML : 数据操作语言 -- DML : 插入数据 - insert -- 1. 为 tb_emp 表的 us…

烟花燃放如何管控?智能分析网关V4烟火检测保障烟火安全

一、方案背景 随着元旦佳节的热潮退去&#xff0c;春节也即将来临&#xff0c;在众多传统的中国节日里&#xff0c;烟花与烧纸祭祀都是必不可少的&#xff0c;一方面表达了人们对节日的庆祝的期许&#xff0c;另一方面也是一种对故者思念的寄托。烟花爆竹的燃放不仅存在着巨大的…

SparkSQL基础解析(三)

1、 Spark SQL概述 1.1什么是Spark SQL Spark SQL是Spark用来处理结构化数据的一个模块&#xff0c;它提供了2个编程抽象&#xff1a;DataFrame和 DataSet&#xff0c;并且作为分布式SQL查询引擎的作用。 我们已经学习了Hive&#xff0c;它是将Hive SQL转换成MapReduce然后提…

【springboot+vue项目(十一)】springboot整合EasyExcel

EasyExcel是阿里巴巴开源的一个Java库&#xff0c;用于操作Excel文件。它提供了简单易用的API&#xff0c;可以读取、写入和转换Excel文件&#xff0c;支持大量数据的导入和导出操作。 一、添加依赖&#xff08;版本3.2&#xff09; <!--easyexcel操作excel--> <depe…

风靡全网的Jmeter+ant+jenkins接口自动化测试框架

大致思路&#xff1a;Jmeter可以做接口测试&#xff0c;也能做压力测试&#xff0c;而且是开源软件&#xff1b;Ant是基于Java的构建工具&#xff0c;完成脚本执行并收集结果生成报告&#xff0c;可以跨平台&#xff0c;Jenkins是持续集成工具。将这三者结合起来可以搭建一套We…

Prometheus 不能访问k8s的中的一些metrics的问题(controller-manager、scheduler、etcd)

主要有三个点 controller-manager、scheduler、etcd 参考&#xff1a; https://www.cnblogs.com/ltaodream/p/15448953.html kube-scheduler 在每台master节点执行 vim /etc/kubernetes/manifests/kube-scheduler.yaml 将 --bind-address127.0.0.1 改为 --bind-address…

基于SSM框架的宠物商城系统

开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.3.9 功能模块&…

软件测试|一篇文章带你深入理解SQL约束

深入理解SQL约束&#xff1a;保障数据完整性和一致性的重要工具 SQL约束是在关系型数据库中用于保障数据完整性和一致性的重要工具。本文将深入探讨SQL约束的概念、类型以及应用&#xff0c;以帮助读者更好地理解和使用SQL约束来确保数据库中的数据质量。 SQL约束 约束&…

vue动态组件、保持存活

加 component :is 引入组件名称 <component :is"tabcom"></component> keep-alive 保持存活 <keep-alive> <component :is"tabcom"></component></keep-alive> 保持存活&#xff1a;切换组件后&#xff0c;不重…

人机交互主板定制_基于MT8735安卓核心板的自助查询机方案

人机交互主板是一种商显智能终端主板&#xff0c;广泛应用于广告机、工控一体机、教学一体机、智能自助终端、考勤机、智能零售终端、O2O智能设备、取号机、计算机视觉、医疗健康设备、机器人设备等领域。 人机交互主板采用联发科MTK8735芯片平台&#xff0c;四核Cortex-A53架构…

Sectigo与Geotrust ov多域名证书的区别

Sectigo和Geotrust都是比较知名的CA认证机构。其中&#xff0c;Sectigo原名Comodo&#xff0c;在2018年整合SSL证书业务&#xff0c;改名为Sectigo&#xff0c;旗下的SSL证书产品根证书也变为Sectigo。Geotrust则是另一个备受信任的数字证书品牌&#xff0c;现在是Digicert旗下…

Python Gradio构建简单的交互界面

Gradio 是一个用于构建机器学习和数据科学的交互式应用程序的 Python 库&#xff0c;但是我们可以用它来构建一些简单的交互界面&#xff0c;其代码之简单令人震惊 文本输入输出 import gradio as grdef szu(text):return textinterface gr.Interface(fnszu, inputs"text…