实战Netty!基于私有协议,怎样快速开发网络通信服务?

news2025/1/16 17:02:14

前言

今天我们一起来来聊聊怎么使用netty。

在工作中,我经常使用netty开发一些服务,掌握netty的工作原理,开发一些服务端以及客户端是非常简单,本篇文章,我们就以具体的协议来进行一个简单的服务的开发。

正文

私有协议

编写目的

本文档用于描述边缘计算单元(以下简称边缘盒)与上位机配置软件(以下简称上位机)之间进行的数据交互通信协议。

通信方式

边缘盒作为服务端,上位机作为客户端,采用TCP/IP协议的socket连接,端口号默认为6000,数据包采用字节的二进制数据传输。

数据包

数据包由包头和消息内容组成,包头固定10个字节,其内容如下:

标志: 包的前导字符,固定CYRC

负载长度: 负载字节数(不包括包头的长度);

协议版本号: 标识通信协议的版本,初版值为0x10

包类型: 标识数据包的操作类型,具体见下表:

取值含义说明
1查询上位机发送查询消息。
2设置上位机发送设置消息。
3查询应答边缘盒对查询请求的应答。
4设置应答边缘盒对设置请求的应答。
5订阅上位机发送给边缘盒订阅数据主动上报请求。
6主动上报边缘盒主动向上位机发送数据。
7心跳上位机发送心跳消息
8心跳应答边缘盒对心跳消息的应答
其他保留

校验位: 负载数据所有字节之和;

Reserve: 预留,值填0;

包体负载(消息内容)表示具体的数据对象,其内容如下:

对于查询、心跳等包类型,包体负载(消息内容)只需要对象标识,对象数据内容省略。

对象标识: 标识数据表的操作对象,具体如下:

取值含义说明
0心跳上位机连接后间隔时间发送心跳消息给边缘盒。

具体的协议内容就不做展示了,下面就开始服务的编写。

服务开发

这里我们开发一个上位机的配置软件(客户端),我们首先要来分析,怎么对数据包进行编解码,其实工作中,这个也是服务开发的核心所在,也是难点所在。

编写消息类

public class MyProtocol
{
    /**
     * 消息的开头的信息标志
     */
    private String head = "CYRC";
    /**
     * 消息的长度
     */
    private int contentLength;
    /**
     * 消息的内容
     */
    private byte[] content;

    public MyProtocol(int contentLength, byte[] content)
    {
        this.contentLength = contentLength;
        this.content = content;
    }

    public String getHead()
    {
        return head;
    }

    public void setHead(String head)
    {
        this.head = head;
    }

    public int getContentLength()
    {
        return contentLength;
    }

    public void setContentLength(int contentLength)
    {
        this.contentLength = contentLength;
    }

    public byte[] getContent()
    {
        return content;
    }

    public void setContent(byte[] content)
    {
        this.content = content;
    }

    public String byteToHex(byte[] bytes, int cnt)
    {
        String strHex;
        StringBuilder sb = new StringBuilder();
        for (int n = 0; n < cnt; n++)
        {
            strHex = Integer.toHexString(bytes[n] & 0xFF);
            sb.append((strHex.length() == 1) ? "0" + strHex : strHex);
            sb.append(" ");
        }
        return sb.toString().trim();
    }

    @Override
    public String toString()
    {
        return "MyProtocol [head=" + head + ", contentLength="
                + contentLength + ", content=" + byteToHex(content, contentLength) + "]";
    }
}
复制代码

MyDecoder解码器

@Slf4j
public class MyDecoder extends ByteToMessageDecoder
{
    /**
     * <pre>
     * 协议开始的标准head_data,CYRC,占据4个字节.
     * 表示数据的长度contentLength,占据2个字节.
     * </pre>
     */
    public final int BASE_LENGTH = 10;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception
    {
        if (buffer.readableBytes() >= BASE_LENGTH)
        {
            if (buffer.readableBytes() > 2048)
            {
                buffer.skipBytes(buffer.readableBytes());
            }

            // 记录包头开始的index
            int beginReader;
            //CYRC   43 59 52 43
            while (true)
            {
                // 获取包头开始的index
                beginReader = buffer.readerIndex();
                // 标记包头开始的index
                buffer.markReaderIndex();
                // 读到了协议的开始标志,结束while循环
                int head1 = buffer.readUnsignedShort();
                int head2 = buffer.readUnsignedShort();
                if (head1 == 17241 && head2 == 21059)
                {
                    break;
                }

                // 未读到包头,略过一个字节
                // 每次略过,一个字节,去读取,包头信息的开始标记
                buffer.resetReaderIndex();
                buffer.readByte();
                // 当略过,一个字节之后,数据包的长度,又变得不满足
                // 此时,应该结束。等待后面的数据到达
                if (buffer.readableBytes() < BASE_LENGTH)
                {
                    return;
                }
            }

            // 消息的长度
            int length = buffer.readUnsignedShort() + 4;
            // 判断请求数据包数据是否到齐
            if (buffer.readableBytes() < length)
            {
                // 还原读指针
                buffer.readerIndex(beginReader);
                return;
            }

            // 读取data数据
            byte[] data = new byte[length];
            buffer.readBytes(data);

            MyProtocol protocol = new MyProtocol(data.length, data);
            out.add(protocol);
        }
    }
}
复制代码

MyEncoder编码器

public class MyEncoder extends MessageToByteEncoder<MyProtocol>
{
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, MyProtocol myProtocol, ByteBuf out) throws Exception
    {
        // 1.写入消息的开头的信息标志(CYCR)
        out.writeBytes(myProtocol.getHead().getBytes());
        // 2.写入消息的长度(负载长度)
        out.writeShort(myProtocol.getContentLength() - 4);
        // 3.写入消息的内容(byte[]类型)
        out.writeBytes(myProtocol.getContent());
    }
}
复制代码

自定义ChannelInboundHandlerAdapter

@Slf4j
public class BootNettyClientChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter
{
    public BootNettyClientChannelInboundHandlerAdapter()
    {

    }

    /**
     * 从服务端收到新的数据时,这个方法会在收到消息时被调用
     *
     * @param ctx
     * @param msg
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
    {
        MyProtocol protocol = (MyProtocol) msg;
        log.info("接收到服务端的消息:" + protocol);
    }

    /**
     * 从服务端收到新的数据、读取完成时调用
     *
     * @param ctx
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws IOException
    {
        ctx.flush();
    }

    /**
     * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
     *
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws InterruptedException
    {
        log.error("exceptionCaught:{}", cause.getMessage());
        ctx.close();//抛出异常,断开与客户端的连接
    }

    /**
     * 客户端与服务端第一次建立连接时 执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception
    {
        super.channelActive(ctx);
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        log.info("channelActive------TCP客户端新建连接------clientIp:{}", clientIp);
    }

    /**
     * 客户端与服务端 断连时 执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception
    {
        super.channelInactive(ctx);
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        ctx.close(); //断开连接时,必须关闭,否则造成资源浪费
        log.info("channelInactive------TCP客户端断开连接----------clientIp:{}", clientIp);
    }
}
复制代码

BootNettyClient客户端

@Slf4j
public class BootNettyClient
{
    public void connect(String host,int port)
    {
        /**
         * 客户端的NIO线程组
         *
         */
        EventLoopGroup group = new NioEventLoopGroup();
        try
        {
            /**
             * Bootstrap 是一个启动NIO服务的辅助启动类 客户端的
             */
            Bootstrap bootstrap = new Bootstrap();
            /**
             * 设置group
             */
            bootstrap = bootstrap.group(group);
            /**
             * 关联客户端通道
             */
            bootstrap = bootstrap.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true);
            /**
             * 设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息
             */
            bootstrap = bootstrap.handler(new ChannelInitializer<SocketChannel>()
            {
                @Override
                protected void initChannel(SocketChannel channel) throws Exception
                {
                    ChannelPipeline pipeline = channel.pipeline();
                    // 添加自定义协议的编解码工具
                    pipeline.addLast(new MyDecoder());
                    pipeline.addLast(new MyEncoder());
                    /**
                     * 自定义ChannelInboundHandlerAdapter
                     */
                    pipeline.addLast(new BootNettyClientChannelInboundHandlerAdapter());
                }
            });
            /**
             * 连接服务端
             */
            ChannelFuture f = bootstrap.connect(host, port).sync();
            log.info("TCP客户端连接成功, 地址是: " + host + ":" + port);
            /**
             * 等待连接端口关闭
             */
            f.channel().closeFuture().sync();
        }
        catch (Exception e)
        {
            log.error("启动netty client失败:", e);
        }
        finally
        {
            /**
             * 退出,释放资源
             */
            group.shutdownGracefully();
        }
    }
}
复制代码

NettyClientApplication程序启动类

@SpringBootApplication
public class NettyClientApplication implements CommandLineRunner {

	public static void main(String[] args) {
		SpringApplication.run(NettyClientApplication.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		new BootNettyClient().connect("172.16.1.100", 6000);
	}
}
复制代码

测试

利用网络调试助手工具,开启一个服务端,模拟发送数据

发送一个完整的包(43 59 52 43 00 01 10 02 00 00 0f),如下图,客户端完整接收数据。

半包测试数据(43 59 52 43 00 01 10 02 00),无日志打印,说明客户端没有接收该不完整数据。

粘包数据测试,两个包一起发送(43 59 52 43 00 01 10 02 00 00 0f 43 59 52 43 00 01 10 02 00 00 0f),如下图,客户端同时接收到两条数据。

粘包数据测试,一个半包发送(43 59 52 43 00 01 10 02 00 00 0f 43 59 52 43 00 01 10 02),如下图,可以看出,只接收到前面完整包的数据,后面的半包数据被忽略。

业务代码编写

业务代码,无非就是将收到的数据进行一些逻辑处理,数据的解析。编写一个接收消息处理类即可。示例如下

通信参数对象:

序号名称字节数取值范围备注
1对象标识13对象标识号:3
2IP地址4每个字节表示一段地址值(A.B.C.D,第一字节对应A,依次类推)
3端口2
4标志1[0-1]0:网络通信,1: 485通信(端口赋波特率,IP赋值0)
@Slf4j
public class ClientService {

  /** 接收边缘盒子消息 */
  public void receiveData(MyProtocol myProtocol) {
    try {
      byte[] data = myProtocol.getContent();
      int type = data[1];
      int objId = data[4];

      // 心跳应答
      if (type == PackageTypeConstant.HEART_BEAT_REPLY) {
        log.info("--------收到心跳回复----------");
      }
      // 查询应答
      else if (type == PackageTypeConstant.QUERY_RESULT) {
        switch (objId) {
          case ObjectIdConstant.SIGNAL:
            {
              reSignalParameter(data); // 接收通信参数
              break;
            }
    
  					//.....
     
          default:
            {
              break;
            }
        }
      }
    } catch (Exception e) {
      log.error("错误的消息指令..", e);
    }
  }

  private void reSignalParameter(byte[] data) {
    EventCenterService.getInstance()
        .submitEvent(
            new IEvent() {
              @Override
              public void execute() {
                try {
                  SignalParameter sp = new SignalParameter();

                  int idx = 5;
                  byte temp1;
                  byte temp2;
                  int a = (data[idx++] & 0xFF);
                  int b = (data[idx++] & 0xFF);
                  int c = (data[idx++] & 0xFF);
                  int d = (data[idx++] & 0xFF);
                  String ip = a + "." + b + "." + c + "." + d;

                  temp1 = data[idx++];
                  temp2 = data[idx++];
                  // 端口号
                  int port = ((char) (temp1 & 0xFF) << 8) | (char) (temp2 & 0xFF);
                  int sign = data[idx];

                  sp.setIp(ip);
                  sp.setPort(port);
                  sp.setSign(sign);

                  DataConfig.signalParameterList.add(sp);
                } catch (Exception e) {
                  log.error("通信参数解析出错:", e);
                }
              }
            });
  }
}
复制代码

后记

工作中,利用netty开发网络通信服务,数据的编解码处理好了,后面的业务代码相对就很容易了。

本篇文章,是我在工作中的一些实战经验,希望对netty感兴趣的小伙伴有点帮助。关于netty的原理这篇文章就不做过多介绍了,前面的文章也讲了很多,后面时间主要讲讲netty实际的运用。

任何一门技术,学会了和实际运用都是两码事,切记眼高手低,程序员从来不适合语言上的巨人,行动上的小人。看到这里,点赞,关注不迷路~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/12338.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一键汇总报告模型可能会需要修改的地方

文章目录如何修改文件夹地址为指定地址?如何取消清除提醒, 避免每次点击弹窗选择?如何取消完成弹窗提醒?如果觉得反复打开工作簿太闪怎么办?如果有些报告的内容页不在第一个Sheet怎么办?如果想修改字段怎么办?运行程序前不想清除原有内容怎么办?报告页有内容但是没有被抓…

混合整数规划的机组组合附Matlab代码

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步精进&#xff0c;matlab项目合作可私信。 &#x1f34e;个人主页&#xff1a;Matlab科研工作室 &#x1f34a;个人信条&#xff1a;格物致知。 更多Matlab仿真内容点击&#x1f447; 智能优化算法 …

Redis常见面试题(2022)

Redis基础 什么是 Redis&#xff1f; Redis 是一个开源&#xff08;BSD 许可&#xff09;、基于内存、支持多种数据结构的存储系统&#xff0c;可以作为数据库、缓存和消息中间件。它支持的数据结构有字符串&#xff08;strings&#xff09;、哈希&#xff08;hashes&#xf…

多分类交叉熵理解

多分类交叉熵有多种不同的表示形式&#xff0c;如下图所示&#xff1a; 但是&#xff0c;有时候我们读论文会深陷其中不能自拔。 也有很多读者、观众会纠正其他作者的文章、视频的交叉熵形式。 实际上&#xff0c;上述三种形式都是没有问题的。 这里&#xff0c;我们就要了解…

多媒体内容理解在美图社区的应用实践

导读&#xff1a;移动互联网时代&#xff0c;图像和短视频等多媒体内容爆发&#xff0c;基于计算机视觉的AI算法是多媒体内容分析的基础。在美图社区智能化发展的过程中&#xff0c;视频和图像分类打标、去重以及质量评估的结果&#xff0c;在推荐、搜索以及人工审核等多个场景…

【R语言数据科学】:变量选择(三)主成分回归和偏最小二乘回归

变量选择(三)主成分回归和偏最小二乘回归 🌸个人主页:JOJO数据科学📝个人介绍:统计学top3高校统计学硕士在读💌如果文章对你有帮助,欢迎✌关注、👍点赞、✌收藏、👍订阅专栏✨本文收录于【R语言数据科学】本系列主要介绍R语言在数据科学领域的应用包括: R语言编…

多分类问题的precision和recall以及F1 scores的计算

对于多分类问题&#xff0c;首先&#xff0c;对于每一个类的精准率&#xff08;Precision&#xff09;和召回率&#xff08;Recall&#xff09;&#xff0c;定义和二分类问题一致&#xff0c;但是计算上不再需要TP,FP,FN等量了&#xff1a;&#xff09; 比如对A, B, C三类有如…

SpringBoot中如何集成ThymeLeaf呢?

转自: SpringBoot中如何集成ThymeLeaf呢&#xff1f; 下文笔者将讲述SpringBoot集成ThymeLeaf的方法&#xff0c;如下所示: 实现思路:1.在pom.xml中引入ThymeLeaf的相关依赖2.在Templates文件夹下编写相应的模板文件例: 1.pom.xml 添加ThymeLeaf依赖<!-- ThymeLeaf 依赖…

河南某商务楼BA系统设计

目 录 第一章 概述 3 第二章 设计任务与要求 4 第三章 设计依据和规范 4 第四章 系统设计 5 4.1系统选型 5 4.2 I/O点位设计 7 4.2.1暖通空调系统 11 4.2.2给排水系统 13 4.2.3电气系统 15 4.3线缆选型设计 17 4.4供电接地设计 17 4.5中央控制室设计 18 第五章 设备清单配置 18…

ASEMI代理力特LSIC1MO120E0080碳化硅MOSFET

编辑-Z 力特碳化硅MOS管LSIC1MO120E0080参数&#xff1a; 型号&#xff1a;LSIC1MO120E0080 漏极-源极电压&#xff08;VDS&#xff09;&#xff1a;1200V 连续漏电流&#xff08;ID&#xff09;&#xff1a;25A 功耗&#xff08;PD&#xff09;&#xff1a;214W 工作结温…

mysql数据库日志

1、日志类型 mysql日志在mysql事务章有事务日志相关的记录。初次之外&#xff0c;MySQL有不同类型的日志文件&#xff0c;用来存储不同类型的日志&#xff0c;分为二进制日志 、 错误日志 通用查询日志和 查询日志 &#xff0c;这也是常用的4种。MySQL 8又新增两种支持的日志&…

关于HTTPDNS,你知道多少?

导读&#xff1a; 全网域名劫持率高&#xff0c;域名解析失败、解析超时&#xff0c;IP调度不精准&#xff0c;域名解析变更生效不实时&#xff0c;这些问题是否一直困扰着你&#xff1f;作为网络请求最前置的环节&#xff0c;域名解析的稳定与精准程度直接决定了APP的访问体验…

实战讲解SpringCloud网关接口限流SpringCloudGateway+Redis(图+文)

1 缘起 最近补充微服务网关相关知识&#xff0c;学习了网关相关概念&#xff0c; 了解网关在微服务中存在的意义及其使命&#xff0c;如统一用户认证、接口权限控制、接口限流、接口熔断、黑白名单机制等&#xff0c; 打算通过实践的方式逐步学习网关的相关功能&#xff0c;同…

从零到一落地接口自动化测试

前段时间写了一系列自动化测试的文章&#xff0c;更多是从方法和解决问题思路角度阐述我的观点。 昨天花了几个小时看完了陈磊老师的《接口测试入门课》&#xff0c;有一些新的收获&#xff0c;结合我自己实践自动化测试的一些经验以及个人理解&#xff0c;这篇文章来聊聊新手…

主要控制系统之间的逻辑关系

电力行业 工控安全解决思路保障框架从电力行业对工控安全需求看&#xff0c;电力企业在主要是以合规性建设为主&#xff0c;在 2004 年原电监会 5 号令颁布开始&#xff0c;大部 分的电厂控制系统安全 建设已经按照 5 号令的要求进行了整改&#xff0c;形成“安全分区、网络专…

【2022硬件设计开源盛宴】一年一度的hackaday大赛结束,冠军便携式风力涡轮机,共提交326个电子作品,奖金池15万美元

https://hackaday.com/2022/11/05/ ... -years-competition/ &#xff08;1&#xff09;一年一度的Hackaday大赛结束&#xff0c;今年是第9届了&#xff0c;总奖金池是15万美元&#xff0c;冠军5万美元。前6届&#xff0c;冠军奖金非常高&#xff0c;像第3届冠军是最厉害的&am…

java设计模式之装饰者模式

一&#xff1a;装饰者模式 1.什么是装饰者模式? 装饰模式是一种结构型设计模式&#xff0c; 允许你通过将对象放入包含行为的特殊封装对象中来为原对象绑定新的行为。 装饰者模式的基本介绍 1.装饰者模式&#xff1a;动态的将新功能附加到对象上。在对象功能扩展方面&#xf…

Jasper 中如何将数据拆成多行并跨行累计

【问题】 I have a query that returns some summary records. For instance, loan amount, loan term, interest rate. Then I want to have a second row that builds out the detailed payment schedule. so the report would look like this: Loan Amt Term …

SpringBoot2

文章目录1.简介1.1 SpringBoot优缺点1.2 官方文档结构2. SpringBoot入门2.1 HelloWord2.2 依赖管理2.3 自动配置2.4 容器功能组件添加原生配置文件引入2.5 配置绑定ConfigurationPropertiesEnableConfigurationProperties2.6 自动配置原理底层总结最佳实践2.7 开发小技巧Lombok…

UML类图简单认识

类 类图包括类、接口和关系。类中包含三元素&#xff0c;第一行是类名&#xff0c;如果是虚类则为斜体。第二行包括属性&#xff0c;如果是public则为&#xff0c;如果是private则为-&#xff0c;如果是protected则为#。第三行包括方法&#xff0c;方法前面的符号表示与属性的…