Netty实战(二)

news2025/1/16 8:48:55

第一个Netty程序

  • 一、环境准备
  • 二、Netty 客户端/服务器概览
  • 三、编写 Echo 服务器
    • 3.1 ChannelHandler 和业务逻辑
    • 3.2 引导服务器
  • 四、编写 Echo 客户端
    • 4.1 通过 ChannelHandler 实现客户端逻辑
    • 4.2 引导客户端
  • 五、构建和运行 Echo 服务器和客户端

一、环境准备

Netty需要的运行环境很简单,只有2个。

  • JDK 1.8+
  • Apache Maven 3.3.9+

二、Netty 客户端/服务器概览

在这里插入图片描述
如图,展示了一个我们将要编写的 Echo 客户端和服务器应用程序。该图展示是多个客户端同时连接到一台服务器。所能够支持的客户端数量,在理论上,仅受限于系统的可用资源(以及所使用的 JDK 版本可能会施加的限制)。

Echo 客户端和服务器之间的交互是非常简单的;在客户端建立一个连接之后,它会向服务器发送一个或多个消息,反过来服务器又会将每个消息回送给客户端。虽然它本身看起来好像用处不大,但它充分地体现了客户端/服务器系统中典型的请求-响应交互模式

三、编写 Echo 服务器

所有的 Netty 服务器都需要以下两部分。

  • 至少一个 ChannelHandler—该组件实现了服务器对从客户端接收的数据的处理,即它的业务逻辑。
  • 引导—这是配置服务器的启动代码。至少,它会将服务器绑定到它要监听连接请求的端口上。

3.1 ChannelHandler 和业务逻辑

上一篇博文我们介绍了 Future 和回调,并且阐述了它们在事件驱动设计中的应用。我们还讨论了 ChannelHandler,它是一个接口族的父接口,它的实现负责接收并响应事件通知。

在 Netty 应用程序中,所有的数据处理逻辑都包含在这些核心抽象的实现中。因为你的 Echo 服务器会响应传入的消息,所以它需要实现ChannelInboundHandler 接口,用来定义响应入站事件的方法。简单的应用程序只需要用到少量的这些方法,所以继承 ChannelInboundHandlerAdapter 类也就足够了,它提供了ChannelInboundHandler 的默认实现。

我们将要用到的方法是:

  • channelRead() :对于每个传入的消息都要调用;
  • channelReadComplete() : 通知ChannelInboundHandler最后一次对channelRead()的调用是当前批量读取中的最后一条消息;
  • exceptionCaught() :在读取操作期间,有异常抛出时会调用。

该 Echo 服务器的 ChannelHandler 实现是 EchoServerHandler,如代码:

package com.example.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author lhd
 * @date 2023/05/16 15:05
 * @notes Netty Echo服务端简单逻辑
 */

//表示channel可以并多个实例共享,它是线程安全的
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        //将消息打印到控制台
        System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
        //将收到的消息写给发送者,而不冲刷出站消息
        ctx.write(in);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        //将未决消息冲刷到远程节点,并且关闭该 Channe
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        //打印异常堆栈跟踪
        cause.printStackTrace();
        //关闭该channel
        ctx.close();
    }
}

ChannelInboundHandlerAdapter 有一个直观的 API,并且它的每个方法都可以被重写以挂钩到事件生命周期的恰当点上。

因为需要处理所有接收到的数据,所以我们重写了 channelRead() 方法。在这个服务器应用程序中,我们将数据简单地回送给了远程节点。

重写 exceptionCaught() 方法允许我们对 Throwable 的任何子类型做出反应,在这里你记录了异常并关闭了连接。

虽然一个更加完善的应用程序也许会尝试从异常中恢复,但在这个场景下,只是通过简单地关闭连接来通知远程节点发生了错误。

ps:如果不捕获异常,会发生什么呢?

每个 Channel 都拥有一个与之相关联的 ChannelPipeline,其持有一个 ChannelHandler 的实例链。在默认的情况下,ChannelHandler 会把对它的方法的调用转发给链中的下一个 ChannelHandler。因此,如果 exceptionCaught()方法没有被该链中的某处实现,那么所接收的异常将会被传递到 ChannelPipeline 的尾端并被记录。为此,你的应用程序应该提供至少有一个实现exceptionCaught()方法的 ChannelHandler。

除了 ChannelInboundHandlerAdapter 之外,还有很多需要学习ChannelHandler的子类型和实现。这些之后会一一说明,目前,我们只关注:

  • 针对不同类型的事件来调用 ChannelHandler;
  • 应用程序通过实现或者扩展 ChannelHandler 来挂钩到事件的生命周期,并且提供自定义的应用程序逻辑;
  • 在架构上,ChannelHandler 有助于保持业务逻辑与网络处理代码的分离。这简化了开发过程,因为代码必须不断地演化以响应不断变化的需求。

3.2 引导服务器

下面我们准备开始构建服务器。构建服务器涉及到两个内容:

  • 绑定到服务器将在其上监听并接受传入连接请求的端口;
  • 配置 Channel,以将有关的入站消息通知给 EchoServerHandler 实例。
package com.example.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;

/**
 * @author lhd
 * @date 2023/05/16 15:21
 * @notes Netty引导服务器
 */
public class EchoServer {

    public static void main(String[] args) throws Exception {
        //调用服务器的 start()方法
        new EchoServer().start();
    }

    public void start() throws Exception {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        //创建EventLoopGroup
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //创建ServerBootstra
            ServerBootstrap b = new ServerBootstrap();
            //指定服务器监视端口
             int port = 8080;
            b.group(group)
                    //指定所使用的 NIO 传输 Channel
                    //因为我们正在使用的是 NIO 传输,所以你指定了 NioEventLoopGroup 来接受和处理新的连接,
                    // 并且将 Channel 的类型指定为 NioServerSocketChannel 。
                    .channel(NioServerSocketChannel.class)
                    //使用指定的端口设置套接字地址
                    //将本地地址设置为一个具有选定端口的 InetSocketAddress 。服务器将绑定到这个地址以监听新的连接请求
                    .localAddress(new InetSocketAddress(port))
                    //添加一个EchoServerHandler 到子Channel的 ChannelPipeline
                    //这里使用了一个特殊的类——ChannelInitializer。这是关键。
                    // 当一个新的连接被接受时,一个新的子 Channel 将会被创建,而 ChannelInitializer 将会把一个你的
                    //EchoServerHandler 的实例添加到该 Channel 的 ChannelPipeline 中。正如我们之前所解释的,
                    // 这个 ChannelHandler 将会收到有关入站消息的通知。
                    .childHandler(new ChannelInitializer<SocketChannel>(){
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            //EchoServerHandler 被标注为 @Shareable,所以我们可以总是使用同样的实例
                            //实际上所有客户端都是使用的同一个EchoServerHandler
                            ch.pipeline().addLast(serverHandler);
                        }
                    });
            //异步地绑定服务器,调用 sync()方法阻塞等待直到绑定完成
            //sync()方法的调用将导致当前 Thread阻塞,一直到绑定操作完成为止
            ChannelFuture f = b.bind().sync();
            //获取 Channel 的CloseFuture,并且阻塞当前线
            //该应用程序将会阻塞等待直到服务器的 Channel关闭(因为你在 Channel 的 CloseFuture 上调用了 sync()方法)
            f.channel().closeFuture().sync();
        } finally {
            //关闭 EventLoopGroup,释放所有的资源,包括所有被创建的线程
            group.shutdownGracefully().sync();
        }
    }
}

我们总结一下服务器实现中的重要步骤。下面这些是服务器的主要代码组件:

  • EchoServerHandler 实现了业务逻辑;
  • main()方法引导了服务器;
    引导过程中所需要的步骤如下:
    • 创建一个 ServerBootstrap 的实例以引导和绑定服务器;
    • 创建并分配一个 NioEventLoopGroup 实例以进行事件的处理,如接受新连接以及读/写数据;
    • 指定服务器绑定的本地的 InetSocketAddress;
    • 使用一个 EchoServerHandler 的实例初始化每一个新的 Channel;
    • 调用 ServerBootstrap.bind()方法以绑定服务器。

到此我们的引导服务器已经完成。

四、编写 Echo 客户端

Echo 客户端将会:
(1)连接到服务器;
(2)发送一个或者多个消息;
(3)对于每个消息,等待并接收从服务器发回的相同的消息;
(4)关闭连接。
编写客户端所涉及的两个主要代码部分也是业务逻辑和引导,和你在服务器中看到的一样。

4.1 通过 ChannelHandler 实现客户端逻辑

如同服务器,客户端将拥有一个用来处理数据的 ChannelInboundHandler。在这个场景下,我们将扩展 SimpleChannelInboundHandler 类以处理所有必须的任务。这要求重写下面的方法:

  • channelActive() : 在到服务器的连接已经建立之后将被调用;
  • channelRead0() : 当从服务器接收到一条消息时被调用;
  • exceptionCaught() :在处理过程中引发异常时被调用。

具体代码可以参考如下:

package com.example.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
 * @author lhd
 * @date 2023/05/16 15:45
 * @notes Netty 简单的客户端逻辑
 */

//标记该类的实例可以被多个 Channel 共享
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    //当被通知 Channel是活跃的时候,发送一条消息
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
    }

    //记录已接收消息的转储
    @Override
    public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
        System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
    }

    //在发生异常时,记录错误并关闭Channel
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

首先,我们重写了 channelActive() 方法,其将在一个连接建立时被调用。这确保了数据将会被尽可能快地写入服务器,其在这个场景下是一个编码了字符串"Netty rocks!"的字节缓冲区。

接下来,我们重写了 channelRead0() 方法。每当接收数据时,都会调用这个方法。由服务器发送的消息可能会被分块接收。也就是说,如果服务器发送了 5 字节,那么不能保证这 5 字节会被一次性接收。即使是对于这么少量的数据,channelRead0()方法也可能会被调用两次,第一次使用一个持有 3 字节的 ByteBuf(Netty 的字节容器),第二次使用一个持有 2 字节的 ByteBuf。作为一个面向流的协议,TCP 保证了字节数组将会按照服务器发送它们的顺序被接收。

ps:所以channelRead0()的调用次数不一定等于服务器发布消息的次数

重写的第三个方法是 exceptionCaught()。如同在 EchoServerHandler(3.1中的代码示例)中所示,记录 Throwable,关闭 Channel,在这个场景下,终止到服务器的连接。

ps:为什么客户端继承SimpleChannelInboundHandler 而不是ChannelInboundHandler?

在客户端,当 channelRead0()方法完成时,我们已经有了传入消息,并且已经处理完它了。当该方法返回时,SimpleChannelInboundHandler 负责释放指向保存该消息的 ByteBuf 的内存引用。

在 EchoServerHandler 中,我们仍然需要将传入消息回送给发送者,而 write()操作是异步的,直到 channelRead()方法返回后可能仍然没有完成。为此,EchoServerHandler扩展了 ChannelInboundHandlerAdapter,其在这个时间点上不会释放消息。消息在 EchoServerHandler 的 channelReadComplete()方法中,当 writeAndFlush()方法被调用时被释放。

4.2 引导客户端

引导客户端类似于引导服务器,不同的是,客户端是使用主机和端口参数来连接远程地址,也就是这里的 Echo 服务器的地址,而不是绑定到一个一直被监听的端口。

package com.example.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

/**
 * @author lhd
 * @date 2023/05/16 15:59
 * @notes 引导客户端
 */
public class EchoClient {
  
    public void start() throws Exception {
        //指定 EventLoopGroup 以处理客户端事件;需要适用于 NIO 的实现
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //创建 Bootstrap
            Bootstrap b = new Bootstrap();
            b.group(group)
                    //适用于 NIO 传输的 Channel 类型
                    .channel(NioSocketChannel.class)
                    //设置服务器的InetSocketAddress
                    .remoteAddress(new InetSocketAddress("127.0.0.1", 8080))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            //在创建Channel时,向 ChannelPipeline中添加一个 EchoClientHandler 实例
                            ch.pipeline().addLast(new EchoClientHandler());}
                    });
            //连接到远程节点,阻塞等待直到连接完成
            ChannelFuture f = b.connect().sync();
            //阻塞,直到Channel 关闭
            f.channel().closeFuture().sync();
        } finally {
            //关闭线程池并且释放所有的资源
            group.shutdownGracefully().sync();
        }
    }
 public static void main(String[] args) throws Exception {
        new EchoClient().start();
    }
}

总结一下要点:

  • 为初始化客户端,创建了一个 Bootstrap 实例;
  • 为进行事件处理分配了一个 NioEventLoopGroup 实例,其中事件处理包括创建新的连接以及处理入站和出站数据;
  • 为服务器连接创建了一个 InetSocketAddress 实例;
  • 当连接被建立时,一个 EchoClientHandler 实例会被安装到(该 Channel 的)
    ChannelPipeline 中;
  • 在一切都设置完成后,调用 Bootstrap.connect()方法连接到远程节点;

综上客户端的构建已经完成。

五、构建和运行 Echo 服务器和客户端

将我们上面的代码复制到IDEA中运行,先启动服务端在启动客户端会得到以下预期效果:

服务端控制台打印:
在这里插入图片描述
客户端控制台打印:
在这里插入图片描述
我们关闭服务端后,客户端控制台打印:
在这里插入图片描述
因为服务端关闭,触发了客户端 EchoClientHandler 中的exceptionCaught()方法,打印出了异常堆栈并关闭了连接。

这只是一个简单的应用程序,但是它可以伸缩到支持数千个并发连接——每秒可以比普通的基于套接字的 Java 应用程序处理多得多的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/532966.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

U盘怎么加密?最简单的U盘加密方法

说起U盘&#xff0c;相信每个人都不会感到陌生&#xff0c;它是最常用的移动存储设备。那么&#xff0c;你会加密U盘吗&#xff1f;相信不少人并不知道这个问题的答案。下面小编就来教大家自己动手制作加密U盘。 首先&#xff0c;我们需要提前做好准备工作&#xff0c;一个可以…

04-数组和字符串

概述 同一个数组所有的成员都是相同的数据类型&#xff0c;同时所有的成员在内存中的地址是连续的。 一维数组 全局数组若不初始化&#xff0c;编译器将其初始化为零。局部数组若不初始化&#xff0c;内容为随机值。 int a[10] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };//定义一…

瑞吉外卖 - 员工信息分页查询功能(7)

某马瑞吉外卖单体架构项目完整开发文档&#xff0c;基于 Spring Boot 2.7.11 JDK 11。预计 5 月 20 日前更新完成&#xff0c;有需要的胖友记得一键三连&#xff0c;关注主页 “瑞吉外卖” 专栏获取最新文章。 相关资料&#xff1a;https://pan.baidu.com/s/1rO1Vytcp67mcw-PD…

NodeJs之调试

关于调试 当我们只专注于前端的时候&#xff0c;我们习惯性F12&#xff0c;这会给我们带来安全与舒心的感觉。 但是当我们使用NodeJs来开发后台的时候&#xff0c;我想噩梦来了。 但是也别泰国担心&#xff0c;NodeJs的调试是很不方便&#xff01;这是肯定的。 但是还好&…

Spring MVC优雅处理业务异常

本文中&#xff0c;我会描述如何在应用程序的不同层次&#xff0c;优雅地处理业务异常。 异常定义 BusinessException基类定义如下&#xff0c;注意异常中携带业务错误码&#xff0c;方便前端处理异常&#xff1a; public class BusinessException extends RuntimeException…

哪些云渲染服务用于多GPU渲染?

众所周知&#xff0c;GPU渲染 可以使用显卡代替CPU进行渲染&#xff0c;可以显着加快渲染速度&#xff0c;因为GPU主要是为快速图像渲染而量身定制的。GPU的诞生是为了应对图形密集型应用程序&#xff0c;这些应用程序会给CPU带来负担并阻碍计算性能。GPU渲染的原理是在多个数据…

信创办公–基于WPS的EXCEL最佳实践系列 (图表)

信创办公–基于WPS的EXCEL最佳实践系列 &#xff08;图表&#xff09; 目录 应用背景操作步骤1、创建图表和图形2、添加其他数据序列3、在源数据的行与列之间切换4、添加图例5、调整图表和图形的大小6、修改图表和图形参数7、应用图表布局和样式8、设置图表和图形的位置9、插入…

4面都过了,最后要价10K,HR说我不尊重华为....

在不知道一个公司的普遍薪资水平的时候&#xff0c;很多面试者不敢盲目的开价&#xff0c;但就因为这样可能使得面试官怀疑你的能力。一位网友就在网上诉说了自己的经历&#xff0c;男子是一位测试员&#xff0c;已经有九年的工作经历了&#xff0c;能力自己觉得还不错。 因为…

单片机课设 - 液晶显示屏显示时间(实验板实现)

目录 前言&#xff1a;本代码涉及的主要知识&#xff1a;代码&#xff08;实验板实现代码&#xff09;&#xff1a; 前言&#xff1a; 设计本代码的主要目的是为了完成期末作业&#xff0c;即在液晶显示屏上显示、时间、日期、温度&#xff0c;以及用按键控制时间、温度的显示。…

Leetcode50. Pow(x, n)

Every day a Leetcode 题目来源&#xff1a;50. Pow(x, n) 解法1&#xff1a;递归 代码&#xff1a; /** lc appleetcode.cn id50 langcpp** [50] Pow(x, n)*/// lc codestart class Solution { public:double myPow(double x, int n){if (n 0)return 1.0;if (n < 0)re…

华为OD机试真题 Java 实现【获取最大软件版本号】【2023Q1 100分】

一、题目描述 Maven版本号定义,<主版本>.<次版本><增量版本>-<里程碑版本> 举例3.1.4-beta 其中,主版本和次版本都是必须的,主版本,次版本,增量版本由多位数字组成,可能包含前导零,里程碑版本由字符串组成。 <主版本>.<次版本>增…

纯代码的3D玫瑰花,有个这个还怕女朋友不开心?

先上效果图&#xff1a; 再上代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title><style>import url("https://fonts.googleapis.com/css2?familyNico…

Anicube NFT 作品集

Anicube 是韩国娱乐公司 Cube Entertainment 和区块链公司 Animoca Brands 的合资公司 Anicube Entertainment 推出的 IP 空间。其主题是音乐元宇宙&#xff0c;各种基于 K-pop 的表演和推广活动将在主舞台和音乐家星球上举行。 此外&#xff0c;这个系列的购买者还将获得特别奖…

web自动化测试:selenium怎么实现关键字驱动

要做 ui 自动化测试&#xff0c;使用关键字驱动可以说是必须会的一种测试方式&#xff0c;它既可以在纯代码的自动化程序中运行&#xff0c;也可以在测试平台中使用。 使用纯代码方式时&#xff0c;自动化工程师先写好一个通用的程序&#xff0c;其他手工测试人员只需要把执行…

gitlab的CICD

大体步骤4步 1.购买阿里云服务器centos 7.6 2.在服务器上安装gitlab-ce 3.在服务器上安装gitlab-runner 4.在gitlab创建一个项目&#xff0c;拉到本地修改后再提交&#xff0c;触发自动部署 视频教程 看不懂文章的&#xff0c;可以看这个视频&#xff0c;超详细gitlab-cicd-…

国产信创适配-东方通TongWeb安装,使用记录

之前项目使用的tomcat容器,需适配东方通 tongweb容器,记录下在国产麒麟ky10.aarch64 服务器下安装东方通容器的过程(对于麒麟内核服务器操作起来和centos7大致一样) 东方通容器是商业的,没有免费版本,企业级安装是公司和东方通联系给了lisence 经过实际测试发现,东方通提…

深度学习技巧应用15-自动机器学习Autogluon的应用技巧

大家好,我是微学AI,今天给大家介绍一下深度学习技巧应用15-自动机器学习Autogluon的应用技巧,Autogluon是一个开源的自动化机器学习工具包,Autogluon的开发目标是为机器学习从业者提供一个高效、易用、可扩展的自动化机器学习工具,让机器学习变得更加简单快捷。本文采用儿…

绘制混淆矩阵(MatLab/Python)

本文主要简单介绍如何绘制混淆矩阵 首先混淆矩阵是机器学习中总结分类模型预测结果的情形分析表&#xff0c;以矩阵形式将数据集中的记录按照真实的类别与分类模型预测的类别判断两个标准进行汇总。 其实混淆矩阵就是用来判断我们的算法的分类准确度的一个可视化矩阵 1…

得物AI平台-KubeAI推理训练引擎设计和实践

1.KubeAI介绍 KubeAI是得物AI平台&#xff0c;是我们在容器化过程中&#xff0c;逐步收集和挖掘公司各业务域在AI模型研究和生产迭代过程中的需求&#xff0c;逐步建设而成的一个云原生AI平台。KubeAI以模型为主线提供了从模型开发&#xff0c;到模型训练&#xff0c;再到推理…

本科毕业生10大高薪专业出炉,IT行业赢麻了

据环球网报道&#xff0c;现在大学毕业生转行率高达80%&#xff01; 非常后悔&#xff01;有不少粉丝向播妞倾诉&#xff0c;曾经以为读了大学就能找到体面的工作&#xff0c;实际上是掉入了天坑专业&#xff0c;成了现实版孔乙己。 大学生找不到对口好工作&#xff0c;似乎已成…