SpringBoot 整合Netty自定义消息协议

news2024/9/23 5:15:32

本文主要介绍springboot项目,配置netty进行消息通信,自定义的netty消息协议,本文消息主要以表格中进行

消息头消息体长度加密标识(可选)加密类型(可选)消息体标识消息体校验码
2字节2字节1字节(本文不包含)1字节(本文不包含)2字节N字节2字节(本文不包含)

1. 创建普通SpringBoot项目

1.1 引入pom.xml的依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.3.2.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.6.Final</version>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.5</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.16.22</version>
</dependency>

1.2 增加application.yml文件

server:
  port: 8070
spring:
  application:
    name: netty-server
netty:
  port: 8090

1.3 创建工具类

public class GsonUtil {
    private static Gson gson = new GsonBuilder().disableHtmlEscaping().create();
    public static String toJson(Object obj) {
        return gson.toJson(obj);
    }
    public static <T> T fromJsonThrow(String str, Type type) {
        try {
            return gson.fromJson(str, type);
        }catch (Exception e){
            throw new RuntimeException(e.getMessage());
        }
    }
    public static <T> T fromJson(String str, Type type) {
        try {
            return gson.fromJson(str, type);
        }catch (Exception e){
            return null;
        }
    }
}

2. Netty 启动与消息处理

2.1 利用springboot项目启动时,启动netty

@Component
public class NettyInitializer implements InitializingBean {

    @Value("${netty.port:8090}")
    public int port;

    @Override
    public void afterPropertiesSet() throws Exception {
        start();
    }
    public void start(){
        //开启新的线程,启动netty
        new Thread(()->{
            NioEventLoopGroup bossgroup = new NioEventLoopGroup();
            NioEventLoopGroup workgroup = new NioEventLoopGroup();
            ServerBootstrap bootstrap = new ServerBootstrap();
            try {
                bootstrap.group(bossgroup,workgroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG,1024)
                        .childOption(ChannelOption.SO_KEEPALIVE,true)
                        .childHandler(new ChanneInitHandler());
                ChannelFuture channelFuture = bootstrap.bind(port).sync();
                channelFuture.channel().closeFuture().sync();
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                bossgroup.shutdownGracefully();
                workgroup.shutdownGracefully();
            }
        }).start();
    }
}

2.2 创建消息链接缓存类
请注意这样写为了快,在实际情况中,可以按需修改,与设计

public class Transmission {
    private static Map<String, Channel> channelMap = new ConcurrentHashMap<>();
    private static Map<Integer, String> classMap = new ConcurrentHashMap<Integer, String>(){{
        put(LoginMessage.CODE,LoginMessage.class.getName());
        put(LoginRespMessage.CODE,LoginRespMessage.class.getName());
        put(SmsMessage.CODE,SmsMessage.class.getName());
    }};
    public static String getClassMap(Integer str){
        return classMap.get(str);
    }
    public static Channel getChannelMap(String str){
        return channelMap.get(str);
    }
    public static Map<String, Channel> getChannelMap(){
        return channelMap;
    }
    public static void setChannelMap(String str,Channel channel){
        channelMap.put(str,channel);
    }
    public static void removeChannelMap(String str){
        channelMap.remove(str);
    }

    public static void transtion(String name,AbstractMessage message){
        ByteBuf buffer = Unpooled.buffer();
        message.encode0(buffer);
        Channel channel = channelMap.get(name);
        channel.writeAndFlush(buffer);
    }

}

2.3 创建自定义协议的消息

/**
*创建消息抽象父类
**/
@Getter
@Setter
public abstract class AbstractMessage {
    public static int HAND = 0xA55A;//消息头
    private static int LEN = 0;//长度
    //解码
    public void decode0(ByteBuf buf){
        decode(buf);
    }
    //编码
    public void encode0(ByteBuf buf){
        buf.writeShort(HAND);
        buf.writeShort(LEN);
        buf.writeShort(code());
        encode(buf);
        int newLen = buf.readableBytes();
        buf.setShort(2,newLen);
    }
    abstract int code();//消息体标识
    abstract void decode(ByteBuf buf);//抽象解码
    abstract void encode(ByteBuf buf);//抽象编码

    public String decodeStr(ByteBuf buf,int len){
        StringBuilder builder = new StringBuilder();
        for (int i = 0; i < len; i++) {
            builder.append((char) buf.readByte());
        }
        return builder.toString();
    }
    public void encodeStr(ByteBuf buf,String str,int len){
        byte[] bytes = str.getBytes();
        for (int i = 0; i < len; i++) {
            buf.writeByte(bytes[i]);
        }
    }
    @Override
    public String toString() {
        return GsonUtil.toJson(this);
    }
}
@Setter
@Getter
public class SmsMessage extends AbstractMessage{
    public static int CODE = 0x00;
    private String msaage;
    @Override
    int code() {
        return CODE;
    }
    @Override
    void decode(ByteBuf buf) {
        msaage = buf.toString(CharsetUtil.UTF_8);
    }
    @Override
    void encode(ByteBuf buf) {
        buf.writeBytes(Unpooled.copiedBuffer(msaage,CharsetUtil.UTF_8));
    }
}
@Setter
@Getter
public class LoginMessage extends AbstractMessage {
    public static int CODE = 0x01;
    private int time;//登录时间戳
    private String name;//登录人姓名
    private String userId;//登录用户id
    private int type;//操作 1,2,3,4,5,6

    @Override
    int code() {return CODE;}
    @Override
    void decode(ByteBuf buf) {
        time = buf.readInt();
        name = decodeStr(buf,10);
        userId = decodeStr(buf,10);
        type = buf.readByte();
    }

    @Override
    void encode(ByteBuf buf) {
        buf.writeInt(time);
        encodeStr(buf,name,10);
        encodeStr(buf,userId,10);
        buf.writeByte(type);
    }
}
@Setter
@Getter
public class LoginRespMessage extends AbstractMessage {
    public static int CODE = 0x02;
    private int result;//操作结果 1,2,3
    private String userId;//登录用户id
    @Override
    int code() {return CODE;}
    @Override
    void decode(ByteBuf buf) {
        result = buf.readByte();
        userId = decodeStr(buf,10);
    }
    @Override
    void encode(ByteBuf buf) {
        buf.writeByte(result);
        encodeStr(buf,userId,10);
    }
}

2.3 创建处理消息的 handler

@Slf4j
public class ReadHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        log.info("read from "+ctx.channel().remoteAddress()+" data : "+ ByteBufUtil.hexDump(msg));
        msg.retain();
        ctx.fireChannelRead(msg);
    }
}
@Slf4j
public class WriteHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if(msg instanceof ByteBuf){
            ByteBuf buf = (ByteBuf) msg;
            log.info("write to "+ctx.channel().remoteAddress()+"  data : "+ ByteBufUtil.hexDump(buf));
        }
        super.write(ctx, msg, promise);
    }
}
@Slf4j
public class LoginMessageHandler extends SimpleChannelInboundHandler<LoginMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LoginMessage msg) throws Exception {
      log.info("LoginMessage : "+msg);
    }
}
@Slf4j
public class SmsMessageHandler extends SimpleChannelInboundHandler<SmsMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, SmsMessage msg) throws Exception {
        log.info(ctx.name()+" " + msg);
    }
}
//请注意 这里的 Transmission 类是用来简单展示链接的channel的,为了方便放在这里
@Slf4j
public class DecodeHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        Transmission.setChannelMap(ctx.name(),ctx.channel());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        AbstractMessage message = doDecode(msg);
        if(message == null){
            return;
        }
        ctx.fireChannelRead(message);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        Transmission.removeChannelMap(ctx.name());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
    //消息解码为对应的消息实体
    public AbstractMessage doDecode(ByteBuf buf){
        ByteBuf buffer = Unpooled.buffer();
        if (buf.readUnsignedShort() == AbstractMessage.HAND) {
            short len = buf.readShort();
            buf.readBytes(buffer,len-4);
            int code = buffer.readUnsignedShort();
            String classMap = Transmission.getClassMap(code);
            if(classMap == null){
                log.warn("class not found");
                return null;
            }
            try {
                Class<?> clazz = Class.forName(classMap);
                AbstractMessage message = (AbstractMessage) clazz.newInstance();
                message.decode0(buffer);
                return message;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        buffer.clear();
        return null;
    }
}

2.4 创建 ChannelInitializer 继承类

public class ChanneInitHandler extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new IdleStateHandler(30,30,30, TimeUnit.SECONDS));
        pipeline.addLast(new ReadHandler());
        pipeline.addLast(new WriteHandler());
        pipeline.addLast(new DecodeHandler());
        pipeline.addLast(new LoginMessageHandler());
        pipeline.addLast(new SmsMessageHandler());
    }
}

3. 创建 web接口

@RestController
public class MonitorController {
    @GetMapping("/getmsg")
    public Object getmsg(){
        Map<String, Channel> channelMap = Transmission.getChannelMap();
        return channelMap;
    }
    @GetMapping("/sendmsg")
    public Object sendmsg(String msg){
        Map<String, Channel> channelMap = Transmission.getChannelMap();
        if (channelMap != null && channelMap.size() > 0) {
            for (Map.Entry<String, Channel> entry : channelMap.entrySet()) {
                SmsMessage message = new SmsMessage();
                message.setMsaage(msg);
                Transmission.transtion(entry.getKey(),message);
            }
        }
        return channelMap;
    }
}

4. 创建客户端去连接

4.1 创建客户端连接

public class Client {
    public static void main(String[] args) throws Exception {
        NioEventLoopGroup loopGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(loopGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE,true)
                .option(ChannelOption.SO_BACKLOG,1024)
                .handler(new ChannelInitClinet());
        bootstrap.connect("localhost",8090).sync();
    }
}

4.2 创建handler消息处理器

@Slf4j
public class DecodeHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        AbstractMessage message = doDecode(msg);
        if(message == null){
            return;
        }
        ctx.fireChannelRead(message);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }

    public AbstractMessage doDecode(ByteBuf buf){
        ByteBuf buffer = Unpooled.buffer();
        if (buf.readUnsignedShort() == AbstractMessage.HAND) {
            short len = buf.readShort();
            buf.readBytes(buffer,len-4);
            int code = buffer.readUnsignedShort();
            String classMap = Transmission.getClassMap(code);
            if(classMap == null){
                log.warn("class not found");
                return null;
            }
            try {
                Class<?> clazz = Class.forName(classMap);
                AbstractMessage message = (AbstractMessage) clazz.newInstance();
                message.decode0(buffer);
                return message;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        buffer.clear();
        return null;
    }
}
@Slf4j
public class SmsClientHandler extends SimpleChannelInboundHandler<SmsMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, SmsMessage msg) throws Exception {
        log.info(ctx.name()+" " + msg);
    }
 }

4.3 创建ChannelInitializer 消息处理器

public class ChannelInitClinet extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new IdleStateHandler(30,30,30, TimeUnit.SECONDS));
        pipeline.addLast(new ReadHandler());
        pipeline.addLast(new WriteHandler());
        pipeline.addLast(new DecodeHandler());
        pipeline.addLast(new SmsClientHandler());
    }
}

5. 测试连接与消息发送

5.1 启动服务端与客户端
5.1.1 在服务端调用接口,发现已经有一个连接了
在这里插入图片描述
5.1.2 代用服务端 发送消息
在这里插入图片描述
5.1.3 客户端以接受到消息
在这里插入图片描述
5.1.4 停掉客户端,查看服务端缓存的channl
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
到这里算是ok了
以上是对自定义消息的简介,大家可以参考

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/98725.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SAP ABAP——SAP简介(一)【SAP发展历程】

&#x1f482;作者简介&#xff1a; THUNDER王&#xff0c;一名热爱财税和SAP ABAP编程以及热爱分享的博主。目前于江西师范大学会计学专业大二本科在读&#xff0c;同时任汉硕云&#xff08;广东&#xff09;科技有限公司ABAP开发顾问。在学习工作中&#xff0c;我通常使用偏后…

研究必备的 5 个外文文献检索网站

1. Google scholar 网址&#xff1a; https://scholar.google.com.hk/?hlzh-CN 如今搜索论文的首选&#xff0c;可以在这里查看论文统计和引用参考文献&#xff0c;还能通过关注作者或者论文获得新论文更新提醒&#xff0c;以及利用自动化推荐来提供一个基本库 2. DBLP 网址…

MSVC C++ UTF-8编程

除windows平台外大部分其他平台&#xff0c;编译器默认使用的编码都是UTF-8编码&#xff0c;最新版本的Clang编译器只支持UTF-8编码。如果程序需要在多个平台编译运行&#xff0c;则代码必须使用UTF-8。使用UTF-8可以更容易的在多字节字符串(char, std::string)和宽字符(wchar_…

Java+SSM汽车租赁系统汽车出租(含源码+论文+答辩PPT等)

项目功能简介: 该项目采用的技术实现如下 后台框架&#xff1a;Spring、SpringMVC、MyBatis UI界面&#xff1a;jQuery 、JSP 数据库&#xff1a;MySQL 系统功能 系统分为前台用户租车和后台系统管理&#xff1a; 1.前台用户租车 用户注册、用户登录、用户中心、浏览车辆、车辆…

Java项目:SSM在线二手图书交易商城网站平台

作者主页&#xff1a;源码空间站2022 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文末获取源码 项目介绍 用户角色包含以下功能&#xff1a; 用户登录,查看商品详情,按分类查看,查看我的书架,上传二手书等功能。 由于本程序规模不大&#xff0c;可供课…

三、CAM可解释性分析——可解释性机器学习(DataWhale组队学习)

文章目录前言CAM算法的精妙之处相关工作CAM算法其它相关问题为什么不用池化操作&#xff1f;CAM的优点CAM算法的缺点扩展阅读和思考题前言 CAM算法奠定了可解释分析的基石 CAM算法的精妙之处 对深度学习实现可解释性分析、显著性分析可扩展性强&#xff0c;后续衍生出各种…

域名备案怎么查?怎么批量查询域名备案

ICP备案&#xff0c;是为了防止在网上从事非法的网站经营活动&#xff0c;打击不良互联网信息的传播&#xff0c;国家对互联网信息服务实行的备案制度。 备案的目的就是为了防止在网上从事非法的网站经营活动&#xff0c;打击不良互联网信息的传播&#xff0c;如果网站不备…

Android TP驱动模型框架分析

本文主要是对TP驱动框架的学习。 一、概述 1、触摸IC的工作原理 tp与主控间的通信接口一般是i2c&#xff0c;即scl、sda、gnd、vcc。在正常工作时需要加上rst、int脚。 整个过程是&#xff1a;通过点击屏幕&#xff0c;tp ic端会将int 脚电平拉低&#xff0c;等待主控的读取。…

【技术分享】Anaconda下载、安装、pip切换镜像源、conda切换镜像、conda创建指定Python版本虚拟环境教程

文章目录1.下载Anaconda1.1.下载最新版本Anaconda1.2.下载历史版本的Anaconda2.安装Anaconda3.conda切换镜像源4.pip切换镜像源5.conda创建指定版本Python环境1.下载Anaconda 1.1.下载最新版本Anaconda 步骤&#xff1a; 进入Anaconda官网&#xff0c;点击Download按钮下载最…

海量数据小内存!如何找到高频数

文章目录题目解答总结题目 如何在 20 亿个无符号整数中找到出现次数最多的那个数&#xff0c;在只提供 1 G 内存的条件下 解答 找到出现次数最多的数&#xff0c;通常的思维就是使用 HashMap 来统计这 20 亿个无符号整数中每个数出现的次数 已知只有 20 亿个数&#xff0c;…

b站黑马的Vue快速入门案例代码——【axios+Vue2】悦听player(音乐播放器)

目录 本文中修改的原代码中的BUG&#xff1a; 修改方法&#xff1a; 本文案例代码仍有的BUG&#xff1a;&#xff08;欢迎大家献计献策&#xff09; 目标效果&#xff1a; 悦音player案例——效果展示视频&#xff1a; 更换的新接口/参数&#xff1a; 1.歌曲搜索接口&…

实战讲解及分析Spring新建Bean的几种方式以及创建过程(图+文+源码)

1 缘起 作为一个应用开发人员而言&#xff0c;会使用某一个工具分为两个层次&#xff08;个人观点&#xff09;&#xff1a; 第一个层次&#xff0c;知道工具&#xff0c;会使用这个工具解决问题&#xff1b; 第二个层次&#xff0c;理解工具的实现原理。 关于Spring的学习&am…

Linux Centos7 磁盘的分区、挂载

1、前言 注&#xff1a;看不懂的同学可以直接跟着后面的步骤操作 一块新的磁盘放到电脑上&#xff0c;要经过分区-->给分区设置文件系统--->挂载才能用。 也就是说要想将磁盘挂载&#xff0c;必须完成给磁盘分区和给分区设置文件系统这两步。 分区的时候先分成主分区和扩…

【DBN分类】基于matlab深度置信网络DBN变压器故障诊断【含Matlab源码 2284期】

一、深度置信网络DBN变压器故障诊断简介 1 DBN模型 DBN是深度学习中最关键的一个多层网络架构&#xff0c;如图2所示&#xff0c;由多层RBM堆叠而成&#xff0c;前一层RBM的输出为后一层RBM的输入&#xff0c;最顶层采用Softmax分类器作为标签层&#xff0c;输出分类识别的结果…

AD-DA转换(PCF8591)

AD转换目录一、AD转换&#xff08;PCF8591&#xff09;①初始化函数②读取ADC值的函数二、DA转换&#xff08;PCF8591&#xff09;三、STC15系列单片机用户手册.pdf—第10章一、AD转换&#xff08;PCF8591&#xff09; 思路&#xff1a;&#xff08;66&#xff0c;两个地址0x90…

RNA-seq——上游分析练习2(数据下载+trim-galore+hisat2+samtools+featureCounts)

目录软件安装新建文件夹一、下载数据二、质控过滤1.数据质量检测2.数据质量控制3.对处理后的数据再次QC三、序列比对1.hisat2比对2.flagstat检查一下结果四、featureCounts定量写在前面——本文是转录组上游分析的实战练习。主要包含四个步骤&#xff1a; 数据下载&#xff08…

DockerCompose编排Redis6.2.6以及遇到的那些坑

场景 Docker中使用Dockerfile的方式部署SpringBootVue前后端分离的项目(若依前后端分离框架为例): Docker中使用Dockerfile的方式部署SpringBootVue前后端分离的项目(若依前后端分离框架为例)_霸道流氓气质的博客-CSDN博客_若依 dockerfile 在上面使用Dockerfile分别构建每个…

Heron‘s formula

In geometry, Heron’s formula (or Hero’s formula) gives the area A of a triangle in terms of the three side lengths a, b, c. If {\textstyle s{\tfrac {1}{2}}(abc)}{\textstyle s{\tfrac {1}{2}}(abc)} is the semiperimeter of the triangle, the area is,[1] {\d…

影视中学职场套路——《如懿传》中职场生存法则

目录 一、老板决定的事&#xff0c;赞成不赞成都要执行 二、居人之下&#xff0c;聪明劲儿别往外露 三、切忌大庭广众直接与上级冲突 四、取悦所有人&#xff0c;不如取悦最大的boss 五、再强的人&#xff0c;也需要团队作战 六、人善被人欺&#xff08;首先要自保&#…

第三十一章 linux-模块的加载过程一

第三十一章 linux-模块的加载过程一 文章目录第三十一章 linux-模块的加载过程一sys_init_modulestruct moduleload_module模块ELF静态的内存视图字符串表&#xff08;string Table)HDR视图的第一次改写find_sec函数ps:kernel symbol内核符号表&#xff0c;就是在内核的内部函数…