Netty自定义消息协议的实现逻辑处理粘包拆包、心跳机制

news2025/1/6 18:39:07

Netty

  • 自定义消息协议的实现逻辑
    • 自定义编码器
  • 心跳机制
    • 实现客户端发送心跳包

自定义消息协议的实现逻辑

在这里插入图片描述
消息协议:这一次消息需要包含两个部分,即消息长度和消息内容本身。
自定义消息编码器︰消息编码器将客户端发送的消息转换成遵守消息协议的消息,即包含消息长度和消息内容的消息
自定义消息解码器∶消息解码器根据消息协议的消息长度,来获得指定长度的消息内容。

自定义编码器

自定义消息协议:

//自定义消息协议
public class MessageProtocal {
    //消息的长度
    private int length;
    //消息的内容
    private byte[] content;

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }
}

客户端基本代码

public class NettyClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup(1);
        Bootstrap bootstrap = new Bootstrap();
        //设置相关的参数
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //添加处理器,分包编码器
                        pipeline.addLast(new MessageEncoder());
                        //添加具体的业务处理器
                        pipeline.addLast(new NettyMessageClientHandler());
                    }
                });
        System.out.println("客户端启动了");
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
        channelFuture.channel().closeFuture().sync();
        group.shutdownGracefully();
    }
}

客户端业务代码

public class NettyMessageClientHandler extends SimpleChannelInboundHandler<MessageProtocal> {
    //连接通道创建后要向服务端发送消息

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for(int i=0;i<200;i++){
            String msg = "西安科技大学";
            //创建消息协议对象
            MessageProtocal messageProtocal = new MessageProtocal();
            messageProtocal.setLength(msg.getBytes(StandardCharsets.UTF_8).length);
            messageProtocal.setContent(msg.getBytes(StandardCharsets.UTF_8));
            //发送协议对象,注意此时ctx只能发送Bytebuf数据,因此需要用编码器把它编码成Bytebuf数据
            ctx.writeAndFlush(messageProtocal);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocal msg) throws Exception {

    }
}

自定义编码器

public class MessageEncoder extends MessageToByteEncoder<MessageProtocal> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocal msg, ByteBuf out) throws Exception {
        out.writeInt(msg.getLength());
        out.writeBytes(msg.getContent());
    }
}

服务端基本代码

public class NettyServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup boosGroup = new NioEventLoopGroup(1);
        EventLoopGroup workGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boosGroup,workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //添加解码器
                        pipeline.addLast(new MessageDecoder());
                        pipeline.addLast(new NettyMessageServerHandler());
                    }
                });
        System.out.println("Netty的服务端启动了");
        ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
        channelFuture.channel().closeFuture().sync();
        boosGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }
}

自定义解码器

//自定义解码器代码
public class MessageDecoder extends ByteToMessageDecoder {
    int length = 0;


    //ctx
    //in:客户端发送来的MessageProtocol编码后的ByteBuf数据
    //out:out里的数据会被放行到下一个handler把解码出来的MessageProtocol放到out里面
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("ByteBuf:"+in);
        //获得前面的4个字节的数据 == 描述实际内容的长度
        if(in.readableBytes()>=4){
            //ByteBuf里面可能有MessageProtocol数据
            if(length==0){
                length = in.readInt();
            }
            //length = 15
            if(in.readableBytes()<length){
                //说明数据还没到齐,等待下一次调用decode
                System.out.println("当前数据量不够,继续等待");
                return;
            }
            //可读数据量>=length ==> 意味着这一次的MessageProtocol的内容已经到齐了
            //创建了一个指定length长度的字节数组
            byte[] content = new byte[length];
            //把ByteBuf里面的指定长度的数据读到content数组中
            in.readBytes(content);
            //创建协议MessageProtocol对象赋值
            MessageProtocal messageProtocal = new MessageProtocal();
            messageProtocal.setLength(length);
            messageProtocal.setContent(content);
            out.add(messageProtocal);
            length=0;
        }
    }
}

服务端业务处理代码

public class NettyMessageServerHandler extends SimpleChannelInboundHandler<MessageProtocal> {


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocal msg) throws Exception {
        System.out.println("---服务器收到的数据---");
        System.out.println("消息的长度:"+msg.getLength());
        System.out.println("消息的内容:"+new String(msg.getContent(), StandardCharsets.UTF_8));
    }
}

运行结果:
在这里插入图片描述


心跳机制

在分布式系统中,心跳机制常常在注册中心组件中提及,比如Zookeeper、Eureka、Nacos等,通过维护客户端的心跳,来判断客户端是否正常在线。如果客户端达到超时次数等预设的条件时,服务端将释放客户端的连接资源。
试想一下,当我们一个用来写数据的通道,它虽然没有下线,但这个通道长时间都不写数据了,是不是我们可以利用心跳机制,关闭此类通道及其对应的客户端

在这里插入图片描述

实现客户端发送心跳包

客户端基本代码

public class NettyClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup(1);
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioServerSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //添加编解码器
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());

                        pipeline.addLast(new NettyClientHandler());
                    }
                });
        System.out.println("客户端启动了");
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",9090).sync();
        //模拟向服务端发送心跳数据
        String packet = "heartbeat packet";
        Random random = new Random();
        Channel channel = channelFuture.channel();
        while (channel.isActive()){
            //随机的事件来实现时间间隔等待
            int num = random.nextInt(10);
            Thread.sleep(num*1000);
            channel.writeAndFlush(packet);
        }
        group.shutdownGracefully();
    }
}

客户端拦截器

public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println("客户端收到的数据"+s);
    }
}

在这里插入图片描述
IdleStateHandler类描述三种空闲状态
读空闲:在指定时间间隔内没有从Channel中读到数据,将会创建状态为READER_IDLE的IdleStateEvent对象。
写空闲︰在指定时间间隔内没有数据写入到Channel中,将会创建状态为WRITER_IDLE的ldleStateEvent对象。
读写空闲:在指定时间间隔内Channel中没有发生读写操作,将会创建状态为ALL_IDLE的ldleStateEvent对象。

服务端基本代码

public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup,workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                        //超时状态处理器会在服务端发现有超过3秒没有没有发生读操作的话会触发超时事件
                        //创建出IdleStateEvent对象,将该对象交给下一个Handler
                        pipeline.addLast(new IdleStateHandler(3,0,0, TimeUnit.SECONDS));
                        //HeartbeatServerHandler必领重写userEventTriggered方法,用来做具体的超时的业务处理
                        pipeline.addLast(new HeartbeatServerHandler());
                    }
                });
        System.out.println("Netty服务端启动了");
        ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
        channelFuture.channel().closeFuture().sync();
        bossGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }
}

服务端业务代码

public class HeartbeatServerHandler extends SimpleChannelInboundHandler<String> {
    int readIdleTimes = 0;

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println("服务端收到的心跳"+s);
        channelHandlerContext.writeAndFlush("服务端已经收到了心跳");
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent event = (IdleStateEvent)evt;
        switch (event.state()){
            case READER_IDLE:
                readIdleTimes++;
                break;
            case WRITER_IDLE:
                System.out.println("写超时");
                break;
            case ALL_IDLE:
                System.out.println("读写超时");
                break;
        }
        if(readIdleTimes>3){
            System.out.println("读超时超过三次,关闭连接");
            ctx.writeAndFlush("超时关闭");
            ctx.channel().close();
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/827088.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Andorid解析XML格式数据遇到的坑

以下是《第一行代码 第三版》解析XML格式数据部分遇到的坑 一、首先是安装Apache遇到的坑 具体参考文章Apache服务器下载安装及使用&#xff08;更新&#xff09;_apache下载_★邱↓邱★的博客-CSDN博客&#xff08;可以不看文中的安装部分了&#xff09; 启动服务那块儿建议…

Java:Map的getOrDefault()方法结果仍为null

1、问题 今天在工作中遇到一个问题&#xff0c;在一个通用的数据处理方法中&#xff0c;方法会从一个Map类型参数通过key里获取对象value&#xff0c;但方法的调用者并不都会传递value实例&#xff0c;若没有获取到value则需要初始化一个&#xff0c;处理方式是调用了Map的getO…

操作系统的运行机制、中断和异常、系统调用

&#x1f40c;个人主页&#xff1a; &#x1f40c; 叶落闲庭 &#x1f4a8;我的专栏&#xff1a;&#x1f4a8; c语言 数据结构 javaweb 石可破也&#xff0c;而不可夺坚&#xff1b;丹可磨也&#xff0c;而不可夺赤。 操作系统 一、操作系统的运行机制1.1内核程序1.2应用程序1…

vue3项目基于vue-router跳转到登录页面

创建项目 #创建项目 #选择vue3 选择npm vue create devops-front#安装vue-router 路由 npm install -g cnpm --registryhttps://registry.npmmirror.com cnpm install vue-router4 #启动项目 vue run serve app.vue 定义<router-view/> 路由入口 <template>&l…

微信小程序(van-tabs) 去除横向滚动条样式(附加源码解决方案+报错图)

问题描述 今天第一次接触vant组件库。 ant官网地址适用于Vue3 支持Vue2、Vue3、微信小程序等 我在使用van-tabs组件时遇到了一个问题&#xff0c;如下图所示&#xff1a; 从图片上可以看到有个灰色的横向滚动条&#xff0c;一开始领导给我说这个问题&#xff0c;我反反复复都…

LED显示屏维修检测方法

电阻检测 对于显示屏的电阻检法&#xff0c;我们需将万用表调到电阻档&#xff0c;先检测一块正常电路板的某点到地电阻值&#xff0c;然后再检测另一块相同电路板的同一个点测试与正常的电阻值是否有不同&#xff0c;如有不同则就知道了该显示屏问题的范围&#xff0c;反之则不…

掌握终端基础技巧:Linux下的文件和目录复制操作

在Linux系统中&#xff0c;命令行终端是一个大而高效的工具&#xff0c;让使用者可以通过简单的命令完成各种任务。其中&#xff0c;文件和目录的复制操作是日常使用频率较高的一项操作。本文将介绍Linux下的文件和目录复制基础技巧&#xff0c;帮助您更好地掌握命令行终端的使…

直播读弹幕机器人:直播弹幕采集+文字转语音(附完整代码)

目录 前言代码实现请求数据解析数据文字转语音完整代码 高级点的tk界面版 前言 直播读弹幕机器人是指能够实时读取直播平台上观众发送的弹幕&#xff0c;并将其转化为语音进行播放的机器人。这种机器人通常会使用文字转语音技术&#xff0c;将接收到的弹幕文本转为语音&#x…

牛客小白月赛74 F题解

文章目录 最便宜的构建问题建模问题分析1.分析所求2.方法1用并查集判断k个点集是否连通&#xff0c;不连通则由小到大添加边代码 3. 方法2使用带权并查集维护当前集合所连通的点集个数代码 4.方法3通过二分确定值代码 最便宜的构建 问题建模 给定n个点m条边的带权无向图&#…

初阶结构体(超详解)

初阶结构体 1. 结构体的声明1.1 结构的基础知识1.2 结构的声明1.3 结构成员的类型1.4 结构体变量的初始化和定义 2. 结构体的访问3. 结构体传参 1. 结构体的声明 1.1 结构的基础知识 结构是一些值的集合&#xff0c;这些值称为成员变量。结构的每个成员可以是不同类型的变量 结…

电商高并发设计之SpringBoot整合Redis实现布隆过滤器

文章目录 问题背景前言布隆过滤器原理使用场景基础中间件搭建如何实现布隆过滤器引入依赖注入RedisTemplate布隆过滤器核心代码Redis操作布隆过滤器验证 总结 问题背景 研究布隆过滤器的实现方式以及使用场景 前言 本篇的代码都是参考SpringBootRedis布隆过滤器防恶意流量击穿缓…

让数据管理由繁至简的低代码开发平台

随着社会数字化能力的快速升级&#xff0c;各行各业正逐渐迈向数字化转型的新时代。尤其是AI的爆发&#xff0c;数据智能技术正在彻底改变着这个行业的面貌&#xff0c;随着越来越多的企业开始将人工智能、机器学习和大数据分析技术应用到其业务中&#xff0c;数据的价值正在得…

深度解剖动态内存管理

> 作者简介&#xff1a;დ旧言~&#xff0c;目前大一&#xff0c;现在学习Java&#xff0c;c&#xff0c;c&#xff0c;Python等 > 座右铭&#xff1a;松树千年终是朽&#xff0c;槿花一日自为荣。 > 望小伙伴们点赞&#x1f44d;收藏✨加关注哟&#x1f495;&#x1…

【项目 进程8】 2.17 内存映射(1) 2.18内存映射(2)

文章目录 2.17 内存映射&#xff08;1&#xff09;内存映射内存映射相关系统调用使用内存映射实现父子进程间通信使用内存映射实现没有关系的进程间的通信 2.18内存映射&#xff08;2&#xff09;内存映射的注意事项使用内存映射实现内存拷贝的功能匿名映射 2.17 内存映射&…

【Spring】学习Spring需要掌握的核心设计思想

目录 一、Spring是什么 二、什么是IoC容器 1、什么是容器 2、什么是IoC 3、Spring IoC 4、DI&#xff08;依赖注入&#xff09; 4.1、IoC和DI的区别 5、 DL&#xff08;依赖查找&#xff09; 一、Spring是什么 我们通常所说的Spring指的是Spring Framework&#xff08;…

DAY55:单调栈(一)每日温度+下一个更大元素Ⅰ

文章目录 739.每日温度栈数据结构单调栈思路单调栈原理单调栈注意点判断条件工作过程分析 完整版 496.下一个更大元素Ⅰ思路映射思路 完整版总结 739.每日温度 给定一个整数数组 temperatures &#xff0c;表示每天的温度&#xff0c;返回一个数组 answer &#xff0c;其中 an…

2023年第四届“华数杯”数学建模思路 - 案例:异常检测

文章目录 赛题思路一、简介 -- 关于异常检测异常检测监督学习 二、异常检测算法2. 箱线图分析3. 基于距离/密度4. 基于划分思想 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 一、简介 – 关于异常检测 异常…

【Java】UWB高精度工业人员安全定位系统源码

基于VueSpring boot前后端分离架构开发的一套UWB技术高精度定位系统源码。 UWB高精度人员定位系统提供实时定位、电子围栏、轨迹回放等基础功能以及各种拓展功能,用户可根据实际需要任意选择搭配拓展功能。该系统简易部署&#xff0c;方便使用&#xff0c;实时响应。UWB高精度定…

Java on Azure Tooling 6月更新|标准消费和专用计划及本地存储账户(Azurite)支持

作者&#xff1a;Jialuo Gan - Program Manager, Developer Division at Microsoft 排版&#xff1a;Alan Wang 大家好&#xff0c;欢迎阅读 Java on Azure 工具的六月更新。在本次更新中&#xff0c;我们将介绍 Azure Spring Apps 标准消费和专用计划支持以及本地存储账户&…

二叉树迭代遍历

PS:以下代码均为C实现 1.二叉树前序遍历 力扣 给你二叉树的根节点 root &#xff0c;返回它节点值的 前序 遍历。 class Solution { public:vector<int> preorderTraversal(TreeNode* root) {stack<TreeNode*> st;vector<int> str;TreeNode* curroot;whil…