由浅入深Netty协议设计与解析

news2024/11/15 21:28:09

目录

  • 1 为什么需要协议?
  • 2 redis 协议举例
  • 3 http 协议举例
  • 4 自定义协议要素
    • 4.1 编解码器
    • 4.2 什么时候可以加 @Sharable


1 为什么需要协议?

在这里插入图片描述

TCP/IP 中消息传输基于流的方式,没有边界。

协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则

例如:在网络上传输

下雨天留客天留我不留

是中文一句著名的无标点符号句子,在没有标点符号情况下,这句话有数种拆解方式,而意思却是完全不同,所以常被用作讲述标点符号的重要性

一种解读

下雨天留客,天留,我不留

另一种解读

下雨天,留客天,留我不?留

如何设计协议呢?其实就是给网络传输的信息加上“标点符号”。但通过分隔符来断句不是很好,因为分隔符本身如果用于传输,那么必须加以区分。因此,下面一种协议较为常用

定长字节表示内容长度 + 实际内容

例如,假设一个中文字符长度为 3,按照上述协议的规则,发送信息方式如下,就不会被接收方弄错意思了

0f下雨天留客06天留09我不留

小故事

很久很久以前,一位私塾先生到一家任教。双方签订了一纸协议:“无鸡鸭亦可无鱼肉亦可白菜豆腐不可少不得束修金”。此后,私塾先生虽然认真教课,但主人家则总是给私塾先生以白菜豆腐为菜,丝毫未见鸡鸭鱼肉的款待。私塾先生先是很不解,可是后来也就想通了:主人把鸡鸭鱼肉的钱都会换为束修金的,也罢。至此双方相安无事。

年关将至,一个学年段亦告结束。私塾先生临行时,也不见主人家为他交付束修金,遂与主家理论。然主家亦振振有词:“有协议为证——无鸡鸭亦可,无鱼肉亦可,白菜豆腐不可少,不得束修金。这白纸黑字明摆着的,你有什么要说的呢?”

私塾先生据理力争:“协议是这样的——无鸡,鸭亦可;无鱼,肉亦可;白菜豆腐不可,少不得束修金。”

双方唇枪舌战,你来我往,真个是不亦乐乎!

这里的束修金,也作“束脩”,应当是泛指教师应当得到的报酬

2 redis 协议举例

NioEventLoopGroup worker = new NioEventLoopGroup();
byte[] LINE = {13, 10};
try {
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.channel(NioSocketChannel.class);
    bootstrap.group(worker);
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) {
            ch.pipeline().addLast(new LoggingHandler());
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                // 会在连接 channel 建立成功后,会触发 active 事件
                @Override
                public void channelActive(ChannelHandlerContext ctx) {
                    set(ctx);
                    get(ctx);
                }
                private void get(ChannelHandlerContext ctx) {
                    ByteBuf buf = ctx.alloc().buffer();
                    buf.writeBytes("*2".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("$3".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("get".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("$3".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("aaa".getBytes());
                    buf.writeBytes(LINE);
                    ctx.writeAndFlush(buf);
                }
                private void set(ChannelHandlerContext ctx) {
                    ByteBuf buf = ctx.alloc().buffer();
                    buf.writeBytes("*3".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("$3".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("set".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("$3".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("aaa".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("$3".getBytes());
                    buf.writeBytes(LINE);
                    buf.writeBytes("bbb".getBytes());
                    buf.writeBytes(LINE);
                    ctx.writeAndFlush(buf);
                }

                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    System.out.println(buf.toString(Charset.defaultCharset()));
                }
            });
        }
    });
    ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();
    channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
    log.error("client error", e);
} finally {
    worker.shutdownGracefully();
}

3 http 协议举例

NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.channel(NioServerSocketChannel.class);
    serverBootstrap.group(boss, worker);
    serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
            ch.pipeline().addLast(new HttpServerCodec());
            ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
                @Override
                protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
                    // 获取请求
                    log.debug(msg.uri());

                    // 返回响应
                    DefaultFullHttpResponse response =
                            new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);

                    byte[] bytes = "<h1>Hello, world!</h1>".getBytes();

                    response.headers().setInt(CONTENT_LENGTH, bytes.length);
                    response.content().writeBytes(bytes);

                    // 写回响应
                    ctx.writeAndFlush(response);
                }
            });
            /*ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    log.debug("{}", msg.getClass());

                    if (msg instanceof HttpRequest) { // 请求行,请求头

                    } else if (msg instanceof HttpContent) { //请求体

                    }
                }
            });*/
        }
    });
    ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
    channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
    log.error("server error", e);
} finally {
    boss.shutdownGracefully();
    worker.shutdownGracefully();
}

4 自定义协议要素

  • 魔数,用来在第一时间判定是否是无效数据包
  • 版本号,可以支持协议的升级
  • 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
  • 指令类型,是登录、注册、单聊、群聊… 跟业务相关
  • 请求序号,为了双工通信,提供异步能力
  • 正文长度
  • 消息正文

4.1 编解码器

根据上面的要素,设计一个登录请求消息和登录响应消息,并使用 Netty 完成收发

@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        // 1. 4 字节的魔数
        out.writeBytes(new byte[]{1, 2, 3, 4});
        // 2. 1 字节的版本,
        out.writeByte(1);
        // 3. 1 字节的序列化方式 jdk 0 , json 1
        out.writeByte(0);
        // 4. 1 字节的指令类型
        out.writeByte(msg.getMessageType());
        // 5. 4 个字节
        out.writeInt(msg.getSequenceId());
        // 无意义,对齐填充
        out.writeByte(0xff);
        // 6. 获取内容的字节数组
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();
        // 7. 长度
        out.writeInt(bytes.length);
        // 8. 写入内容
        out.writeBytes(bytes);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serializerType = in.readByte();
        byte messageType = in.readByte();
        int sequenceId = in.readInt();
        in.readByte();
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);
        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
        Message message = (Message) ois.readObject();
        log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
        log.debug("{}", message);
        out.add(message);
    }
}

测试

EmbeddedChannel channel = new EmbeddedChannel(
    new LoggingHandler(),
    new LengthFieldBasedFrameDecoder(
        1024, 12, 4, 0, 0),
    new MessageCodec()
);
// encode
LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123", "张三");
//        channel.writeOutbound(message);
// decode
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, message, buf);

ByteBuf s1 = buf.slice(0, 100);
ByteBuf s2 = buf.slice(100, buf.readableBytes() - 100);
s1.retain(); // 引用计数 2
channel.writeInbound(s1); // release 1
channel.writeInbound(s2);

解读

在这里插入图片描述

4.2 什么时候可以加 @Sharable

  • 当 handler 不保存状态时,就可以安全地在多线程下被共享
  • 但要注意对于编解码器类,不能继承 ByteToMessageCodec 或 CombinedChannelDuplexHandler 父类,他们的构造方法对 @Sharable 有限制
  • 如果能确保编解码器不会保存状态,可以继承 MessageToMessageCodec 父类
@Slf4j
@ChannelHandler.Sharable
/**
 * 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
 */
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
        ByteBuf out = ctx.alloc().buffer();
        // 1. 4 字节的魔数
        out.writeBytes(new byte[]{1, 2, 3, 4});
        // 2. 1 字节的版本,
        out.writeByte(1);
        // 3. 1 字节的序列化方式 jdk 0 , json 1
        out.writeByte(0);
        // 4. 1 字节的指令类型
        out.writeByte(msg.getMessageType());
        // 5. 4 个字节
        out.writeInt(msg.getSequenceId());
        // 无意义,对齐填充
        out.writeByte(0xff);
        // 6. 获取内容的字节数组
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();
        // 7. 长度
        out.writeInt(bytes.length);
        // 8. 写入内容
        out.writeBytes(bytes);
        outList.add(out);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serializerType = in.readByte();
        byte messageType = in.readByte();
        int sequenceId = in.readInt();
        in.readByte();
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);
        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
        Message message = (Message) ois.readObject();
        log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
        log.debug("{}", message);
        out.add(message);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/547360.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

每日学术速递5.18

CV - 计算机视觉 | ML - 机器学习 | RL - 强化学习 | NLP 自然语言处理 Subjects: cs.CV 1.Make-A-Protagonist: Generic Video Editing with An Ensemble of Experts 标题&#xff1a;Make-A-Protagonist&#xff1a;与专家合奏的通用视频编辑 作者&#xff1a;Yuyang Z…

云端一体助力体验升级和业务创新

随着音视频和AI技术的发展&#xff0c;在满足用户基础体验和需求情况下&#xff0c;更极致的用户体验和更丰富的互动玩法&#xff0c;成为各个平台打造核心竞争力的关键。LiveVideoStackCon 2022 北京站邀请到火山引擎视频云华南区业务负责人——张培垒&#xff0c;基于节跳动音…

虚幻引擎4利用粒子系统实现物体轨迹描绘

虚幻引擎4利用粒子系统实现物体轨迹描绘 目录 虚幻引擎4利用粒子系统实现物体轨迹描绘前言粒子系统利用粒子系统实现物体轨迹描绘创建粒子系统将粒子系统的产生位置绑定到运动物体上 小结 前言 由于在物体运动时&#xff0c;想要观察其总的运动轨迹&#xff0c;以便对其控制做…

Java实现天气预报功能

如果要实现类似百度天气、手机App这样的天气预报功能该如何实现&#xff1f;首先想到的是百度... 背景&#xff1a; 最近公司做了一个项目&#xff0c;天气预报的功能也做上去了&#xff0c;不仅有实时天气、未来7天预报的功能、还有气象预警的功能。 天气包括基本天气、白天夜…

【K8s】什么是helm?helm的常用指令

文章目录 一、Helm介绍1、背景2、介绍3、核心概念4、chart的基本结构5、helm官网 二、部署Helm1、安装helm客户端2、安装Tiller 三、常用指令1、仓库相关 helm repo2、chart相关3、release相关 四、入门案例1、构建第一个chart2、将chart包发布到Repository3、在 Kubernetes 中…

Nacos之服务注册中心

1.Nacos之服务提供者注册 官方文档 1.1.前期工作 1.1.1.新建Module - api-commons POM <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSc…

区块链商业模式

1. 引言 web2 vs web3&#xff1a; 基于区块链的商业模式有&#xff1a; 1&#xff09;Token Economy-Utility Token商业模式2&#xff09;Blockchain As A Service&#xff08;Baas&#xff09;商业模式3&#xff09;Blockchain-Based Software Products商业模式4&#xf…

【C++修炼之路】30.可变参数模板包装器

每一个不曾起舞的日子都是对生命的辜负 C11之可变参数模板&&包装器 前言一.可变参数模板的首次登场二.参数包展开2.1 递归函数方式展开参数包2.2 逗号表达式展开参数包 三.容器的emplace方法四.包装器4.1 什么是function4.2 function包装器的作用4.3 function的实际用途…

使用Redis实现短信验证码登录功能

一、概述 目前微信小程序或网站的登录方式大部分采取了微信扫码或短信验证码等方式&#xff0c;为什么短信验证码登录方式会受到互联网公司的青睐&#xff0c;因为其确实有许多好处&#xff1a; 方便快捷&#xff1a;用户无需记忆复杂的用户名和密码&#xff0c;只需通过短信…

Python共享文件 - Python快速搭建HTTP web服务实现文件共享并公网远程访问

文章目录 1. 前言2. 视频教程3. 本地文件服务器搭建3.1 python的安装和设置3.2 cpolar的安装和注册 4. 本地文件服务器的发布4.1 Cpolar云端设置4.2 Cpolar本地设置 5. 公网访问测试6. 结语 转载自内网穿透工具的文章&#xff1a;Python一行代码实现文件共享【内网穿透公网访问…

全域兴趣电商:国货品牌的新策略、新玩法

【潮汐商业评论/原创】 消费的方向标已经变了。 在消费市场的滚滚浪潮里&#xff0c;国人的“衣食住行”在全面的“国货化”&#xff0c;一个个有颜值有实力的国货品牌如雨后春笋般出现在寻常百姓家&#xff0c;如今在这片肥沃的土壤上正结出适合国人使用的果实。 01 国货二…

Openai+Coursera: ChatGPT Prompt Engineering(二)

这是我写的ChatGPT Prompt Engineerin的第二篇博客&#xff0c;如何还没看过第一篇的请先看我写的第一篇博客&#xff1a; ChatGPT Prompt Engineerin(一) Summarizing(总结/摘要&#xff09; 今天我们的重点关注按特定主题来总结文本。 设置参数 import openai openai.api_…

【备战秋招】每日一题:3月18日美团春招第二题:题面+题目思路 + C++/python/js/Go/java 带注释

2023大厂笔试模拟练习网站&#xff08;含题解&#xff09; www.codefun2000.com 最近我们一直在将收集到的各种大厂笔试的解题思路还原成题目并制作数据&#xff0c;挂载到我们的OJ上&#xff0c;供大家学习交流&#xff0c;体会笔试难度。现已录入200道互联网大厂模拟练习题&…

深入理解递归算法

文章目录 概述单路递归 Single RecursionE01. 阶乘E02. 反向打印字符串E03. 二分查找 多路递归 Multi RecursionE01. 斐波那契数列 递归优化-记忆法递归优化-尾递归递归时间复杂度-Master theorem递归时间复杂度-展开求解 概述 定义 计算机科学中&#xff0c;递归是一种解决计…

Unity UI -- (5)增加基础按钮功能

分析分析一些常见UI 良好的UI设计会清晰地和用户沟通。用户知道他们能和屏幕上哪些东西交互&#xff0c;哪些不能。如果他们进行了交互&#xff0c;他们也要清楚地知道交互是否成功。换句话说&#xff0c;UI要提供给用户很多反馈。 我们可以来看看在Unity里或者在计算机上的任何…

一款适合国内多场景的免费ChatGPT镜像网站【建议收藏】

随着人工智能技术的不断进步&#xff0c;智能问答系统正逐渐成为我们生活中必不可少的助手。而在这个领域中&#xff0c;ChatGPT中文版-知否AI问答凭借其出色的性能和广泛的应用场景&#xff0c;成为了引领智能问答新时代的重要代表。本文将带您深入了解ChatGPT中文版-知否AI问…

LabVIEWCompactRIO 开发指南25 实施LabVIEW FPGA代码的方法

LabVIEWCompactRIO 开发指南25 实施LabVIEW FPGA代码的方法 开始开发时&#xff0c;应在LabVIEW项目的FPGA目标下创建VI&#xff0c;以便使用LabVIEW FPGA选板进行编程&#xff0c;该选板是LabVIEW选板的子集&#xff0c;包括一些LabVIEW FPGA特定函数。 应该在仿真模式下开…

每日一个MySQL知识点:主从表大小相差巨大和一个BUG

一、主从相同表空间相差巨大 1.1 问题描述 我们知道MySQL主从基本上是逻辑的复制&#xff0c;那么有少量的空间差异没有问题&#xff0c;但是本案例主库表只有10G&#xff0c;但是从库表有100G&#xff0c;这么大的差距比较少见&#xff0c;需要分析原因。 1.2 问题分析 实…

ResNet (深度残差网络)

ResNet 算法概述 解决的核心问题&#xff1a;网络的退化现象 网络层数在变深之后&#xff0c;性能不如浅层时候的性能 。注意&#xff1a;网络退化既不是梯度消失也不是梯度爆炸。 那是如何解决退化现象的呢&#xff1f;引入残差模块 把模型的输入分成两条路&#xff1a;右边…

SQL 大全(四)|数据库迁移升级时常用 SQL 语句

作者 | JiekeXu 来源 |公众号 JiekeXu DBA之路&#xff08;ID: JiekeXu_IT&#xff09; 如需转载请联系授权 | (个人微信 ID&#xff1a;JiekeXu_DBA) 大家好&#xff0c;我是 JiekeXu,很高兴又和大家见面了,今天和大家一起来看看SQL 大全&#xff08;四&#xff09;|数据库迁移…