从Redis、HTTP协议,看Nett协议设计,我发现了个惊天大秘密

news2024/11/16 23:45:06

1. 协议的作用

TCP/IP 中消息传输基于流的方式,没有边界

协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则

2. Redis 协议

如果我们要向 Redis 服务器发送一条 set name Nyima 的指令,需要遵守如下协议

// 该指令一共有3部分,每条指令之后都要添加回车与换行符
*3\r\n
// 第一个指令的长度是3
$3\r\n
// 第一个指令是set指令
set\r\n
// 下面的指令以此类推
$4\r\n
name\r\n
$5\r\n
Nyima\r\n

客户端代码如下

public class RedisClient {
    static final Logger log = LoggerFactory.getLogger(StudyServer.class);
    public static void main(String[] args) {
        NioEventLoopGroup group =  new NioEventLoopGroup();
        try {
            ChannelFuture channelFuture = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            // 打印日志
                            ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    // 回车与换行符
                                    final byte[] LINE = {'\r','\n'};
                                    // 获得ByteBuf
                                    ByteBuf buffer = ctx.alloc().buffer();
                                    // 连接建立后,向Redis中发送一条指令,注意添加回车与换行
                                    // set name Nyima
                                    buffer.writeBytes("*3".getBytes());
                                    buffer.writeBytes(LINE);
                                    buffer.writeBytes("$3".getBytes());
                                    buffer.writeBytes(LINE);
                                    buffer.writeBytes("set".getBytes());
                                    buffer.writeBytes(LINE);
                                    buffer.writeBytes("$4".getBytes());
                                    buffer.writeBytes(LINE);
                                    buffer.writeBytes("name".getBytes());
                                    buffer.writeBytes(LINE);
                                    buffer.writeBytes("$5".getBytes());
                                    buffer.writeBytes(LINE);
                                    buffer.writeBytes("Nyima".getBytes());
                                    buffer.writeBytes(LINE);
                                    ctx.writeAndFlush(buffer);
                                }

                            });
                        }
                    })
                    .connect(new InetSocketAddress("localhost", 6379));
            channelFuture.sync();
            // 关闭channel
            channelFuture.channel().close().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 关闭group
            group.shutdownGracefully();
        }
    }
}

控制台打印结果

1600 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler  - [id: 0x28c994f1, L:/127.0.0.1:60792 - R:localhost/127.0.0.1:6379] WRITE: 34B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 2a 33 0d 0a 24 33 0d 0a 73 65 74 0d 0a 24 34 0d |*3..$3..set..$4.|
|00000010| 0a 6e 61 6d 65 0d 0a 24 35 0d 0a 4e 79 69 6d 61 |.name..$5..Nyima|
|00000020| 0d 0a                                           |..              |
+--------+-------------------------------------------------+----------------+

Redis 中查询执行结果

file

3. HTTP 协议

HTTP 协议在请求行请求头中都有很多的内容,自己实现较为困难,可以使用 HttpServerCodec 作为服务器端的解码器与编码器,来处理 HTTP 请求

// HttpServerCodec 中既有请求的解码器 HttpRequestDecoder 又有响应的编码器 HttpResponseEncoder
// Codec(CodeCombine) 一般代表该类既作为 编码器 又作为 解码器
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
        implements HttpServerUpgradeHandler.SourceCodec

服务器代码

public class HttpServer {
    static final Logger log = LoggerFactory.getLogger(StudyServer.class);

    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        new ServerBootstrap()
                .group(group)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        // 作为服务器,使用 HttpServerCodec 作为编码器与解码器
                        ch.pipeline().addLast(new HttpServerCodec());
                        // 服务器只处理HTTPRequest
                        ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) {
                                // 获得请求uri
                                log.debug(msg.uri());

                                // 获得完整响应,设置版本号与状态码
                                DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
                                // 设置响应内容
                                byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
                                // 设置响应体长度,避免浏览器一直接收响应内容
                                response.headers().setInt(CONTENT_LENGTH, bytes.length);
                                // 设置响应体
                                response.content().writeBytes(bytes);

                                // 写回响应
                                ctx.writeAndFlush(response);
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

服务器负责处理请求并响应浏览器。所以只需要处理 HTTP 请求即可

// 服务器只处理HTTPRequest
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>()

获得请求后,需要返回响应给浏览器。需要创建响应对象 DefaultFullHttpResponse,设置 HTTP 版本号及状态码,为避免浏览器获得响应后,因为获得 CONTENT_LENGTH 而一直空转,需要添加 CONTENT_LENGTH 字段,表明响应体中数据的具体长度

// 获得完整响应,设置版本号与状态码
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// 设置响应内容
byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
// 设置响应体长度,避免浏览器一直接收响应内容
response.headers().setInt(CONTENT_LENGTH, bytes.length);
// 设置响应体
response.content().writeBytes(bytes);

运行结果

浏览器

file
控制台

// 请求内容
1714 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler  - [id: 0x72630ef7, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:55503] READ: 688B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 66 61 76 69 63 6f 6e 2e 69 63 6f |GET /favicon.ico|
|00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 48 6f 73 74 3a | HTTP/1.1..Host:|
|00000020| 20 6c 6f 63 61 6c 68 6f 73 74 3a 38 30 38 30 0d | localhost:8080.|
|00000030| 0a 43 6f 6e 6e 65 63 74 69 6f 6e 3a 20 6b 65 65 |.Connection: kee|
|00000040| 70 2d 61 6c 69 76 65 0d 0a 50 72 61 67 6d 61 3a |p-alive..Pragma:|
....

// 响应内容
1716 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler  - [id: 0x72630ef7, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:55503] WRITE: 61B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 43 6f 6e 74 65 6e 74 2d 4c 65 6e 67 74 68 3a |.Content-Length:|
|00000020| 20 32 32 0d 0a 0d 0a 3c 68 31 3e 48 65 6c 6c 6f | 22....<h1>Hello|
|00000030| 2c 20 57 6f 72 6c 64 21 3c 2f 68 31 3e          |, World!</h1>   |
+--------+-------------------------------------------------+----------------+

4. 自定义协议

组成要素

  • 魔数:用来在第一时间判定接收的数据是否为无效数据包

  • 版本号:可以支持协议的升级

  • 序列化算法

    :消息正文到底采用哪种序列化反序列化方式

    • 如:json、protobuf、hessian、jdk
  • 指令类型:是登录、注册、单聊、群聊… 跟业务相关

  • 请求序号:为了双工通信,提供异步能力

  • 正文长度

  • 消息正文

编码器与解码器

public class MessageCodec extends ByteToMessageCodec<Message> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        // 设置魔数 4个字节
        out.writeBytes(new byte[]{'N','Y','I','M'});
        // 设置版本号 1个字节
        out.writeByte(1);
        // 设置序列化方式 1个字节
        out.writeByte(1);
        // 设置指令类型 1个字节
        out.writeByte(msg.getMessageType());
        // 设置请求序号 4个字节
        out.writeInt(msg.getSequenceId());
        // 为了补齐为16个字节,填充1个字节的数据
        out.writeByte(0xff);

        // 获得序列化后的msg
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();

        // 获得并设置正文长度 长度用4个字节标识
        out.writeInt(bytes.length);
        // 设置消息正文
        out.writeBytes(bytes);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 获取魔数
        int magic = in.readInt();
        // 获取版本号
        byte version = in.readByte();
        // 获得序列化方式
        byte seqType = in.readByte();
        // 获得指令类型
        byte messageType = in.readByte();
        // 获得请求序号
        int sequenceId = in.readInt();
        // 移除补齐字节
        in.readByte();
        // 获得正文长度
        int length = in.readInt();
        // 获得正文
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);
        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
        Message message = (Message) ois.readObject();
		// 将信息放入List中,传递给下一个handler
        out.add(message);
        
        // 打印获得的信息正文
        System.out.println("===========魔数===========");
        System.out.println(magic);
        System.out.println("===========版本号===========");
        System.out.println(version);
        System.out.println("===========序列化方法===========");
        System.out.println(seqType);
        System.out.println("===========指令类型===========");
        System.out.println(messageType);
        System.out.println("===========请求序号===========");
        System.out.println(sequenceId);
        System.out.println("===========正文长度===========");
        System.out.println(length);
        System.out.println("===========正文===========");
        System.out.println(message);
    }
}

  • 编码器与解码器方法源于父类 ByteToMessageCodec,通过该类可以自定义编码器与解码器, 泛型类型为被编码与被解码的类。此处使用了自定义类 Message,代表消息
public class MessageCodec extends ByteToMessageCodec<Message>
  • 编码器负责将附加信息与正文信息写入到 ByteBuf 中,其中附加信息总字节数最好为 2n,不足需要补齐。正文内容如果为对象,需要通过序列化将其放入到 ByteBuf 中

  • 解码器负责将 ByteBuf 中的信息取出,并放入 List 中,该 List 用于将信息传递给下一个 handler

编写测试类

public class TestCodec {
    static final org.slf4j.Logger log = LoggerFactory.getLogger(StudyServer.class);
    public static void main(String[] args) throws Exception {
        EmbeddedChannel channel = new EmbeddedChannel();
        // 添加解码器,避免粘包半包问题
        channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));
        channel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
        channel.pipeline().addLast(new MessageCodec());
        LoginRequestMessage user = new LoginRequestMessage("Nyima", "123");

        // 测试编码与解码
        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
        new MessageCodec().encode(null, user, byteBuf);
        channel.writeInbound(byteBuf);
    }
}
  • 测试类中用到了 LengthFieldBasedFrameDecoder,避免粘包半包问题
  • 通过 MessageCodec 的 encode 方法将附加信息与正文写入到 ByteBuf 中,通过 channel 执行入站操作。入站时会调用 decode 方法进行解码

运行结果

file

file

@Sharable 注解

为了提高 handler 的复用率,可以将 handler 创建为 handler 对象,然后在不同的 channel 中使用该 handler 对象进行处理操作

LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
// 不同的channel中使用同一个handler对象,提高复用率
channel1.pipeline().addLast(loggingHandler);
channel2.pipeline().addLast(loggingHandler);

但是并不是所有的 handler 都能通过这种方法来提高复用率的,例如 LengthFieldBasedFrameDecoder。如果多个 channel 中使用同一个 LengthFieldBasedFrameDecoder 对象,则可能发生如下问题

  • channel1 中收到了一个半包,LengthFieldBasedFrameDecoder 发现不是一条完整的数据,则没有继续向下传播

  • 此时 channel2 中也收到了一个半包,因为两个 channel 使用了同一个 LengthFieldBasedFrameDecoder,存入其中的数据刚好拼凑成了一个完整的数据包。LengthFieldBasedFrameDecoder 让该数据包继续向下传播,最终引发错误

为了提高 handler 的复用率,同时又避免出现一些并发问题,Netty 中原生的 handler 中用 @Sharable 注解来标明,该 handler 能否在多个 channel 中共享。

只有带有该注解,才能通过对象的方式被共享,否则无法被共享

自定义编解码器能否使用 @Sharable 注解

这需要根据自定义的 handler 的处理逻辑进行分析

我们的 MessageCodec 本身接收的是 LengthFieldBasedFrameDecoder 处理之后的数据,那么数据肯定是完整的,按分析来说是可以添加 @Sharable 注解的

但是实际情况我们并不能添加该注解,会抛出异常信息 ChannelHandler cn.nyimac.study.day8.protocol.MessageCodec is not allowed to be shared

  • 因为 MessageCodec 继承自 ByteToMessageCodec,ByteToMessageCodec 类的注解如下

    file

这就意味着 ByteToMessageCodec 不能被多个 channel 所共享的

  • 原因:因为该类的目标是:将 ByteBuf 转化为 Message,意味着传进该 handler 的数据还未被处理过。所以传过来的 ByteBuf 可能并不是完整的数据,如果共享则会出现问题

如果想要共享,需要怎么办呢?

继承 MessageToMessageDecoder 即可。 该类的目标是:将已经被处理的完整数据再次被处理。传过来的 Message 如果是被处理过的完整数据,那么被共享也就不会出现问题了,也就可以使用 @Sharable 注解了。实现方式与 ByteToMessageCodec 类似

@ChannelHandler.Sharable
public class MessageSharableCodec extends MessageToMessageCodec<ByteBuf, Message> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
        ...
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
		...
    }
}

本文由传智教育博学谷教研团队发布。

如果本文对您有帮助,欢迎关注点赞;如果您有任何建议也可留言评论私信,您的支持是我坚持创作的动力。

转载请注明出处!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/136615.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第一章 R语言介绍

1.为何使用R 与起源于贝尔实验室的S语言类似&#xff0c;R也是一种为统计计算和绘图而生的语言和环境&#xff0c;它是一套开源的数据分析解决方案&#xff0c;由一个庞大且活跃的全球性研究型社区维护。但是&#xff0c;市面上也有许多其他流行的统计和制图软件&#xff0c;如…

NLP自然语言处理NLTK常用英文功能汇总

自然语言处理 (NLP) 是一门研究如何让计算机程序理解人类语言的学科。NLTK (Natural Language Toolkit) 是一个 Python 包,可以用于 NLP 的应用开发。 很多数据都是非结构化的,而且包含可以被人类读懂的文本。在用编程方式分析这些数据之前,我们需要对它们进行预处理。在本…

Allegro174版本新功能介绍之背景颜色设置

Allegro174版本新功能介绍之背景颜色设置 Allegro升级到了174的时候,打开的时候默认是黑色的背景,如下图 选择界面 工作界面 和166以及172版本不一样,174支持切换成白色的背景,具体操作如下 选择setup

Java--基础语法

文章目录一、输出hello world二、示例说明三、基本语法三、标识符规则四、注释一、输出hello world public class Helloworld {/*第一个java程序*输出Hello world!!!*/public static void main(String[] args) {//输出Hello world!!!System.out.println("Hello world!!!&…

如何使用Git同时绑定Github以及Gitee

今天接到一项任务&#xff0c;是需要clone一个github上面的项目&#xff0c;正兴高采烈的git clone的时候&#xff0c;git bash框框报错&#xff0c;突然一想&#xff0c;我貌似一直用的Gitee,绑定的也是Gitee,并没有绑定Github,于是就有了这篇博客记录如何使用Git同时绑定Gite…

CTF压轴题解题思路和过程

前言 压轴题难度极大。我在这里详细的记录一下解题思路和过程 题目初探 拿到题目&#xff0c;为三个文件&#xff0c;其中mem_secret-963a4663.vmem为常见内存镜像文件&#xff0c;另外两个文件格式未知。 使用volatility进行分析无法识别profile。 接着分析分析Encryption.…

探索云原生技术之容器编排引擎-Kubernetes/K8S详解(6)

❤️作者简介&#xff1a;2022新星计划第三季云原生与云计算赛道Top5&#x1f3c5;、华为云享专家&#x1f3c5;、云原生领域潜力新星&#x1f3c5; &#x1f49b;博客首页&#xff1a;C站个人主页&#x1f31e; &#x1f497;作者目的&#xff1a;如有错误请指正&#xff0c;将…

Day 19-Vue3 技术_其它

1.全局API的转移 Vue 2.x 有许多全局 API 和配置。例如&#xff1a;注册全局组件、注册全局指令等。 //注册全局组件 Vue.component(MyButton, {data: () > ({count: 0}),template: <button click"count">Clicked {{ count }} times.</button> })//注…

ESXI6.5.0安装部署

将ESXI6.5.0系统盘放入光驱&#xff0c;插入服务器&#xff0c;启动服务器&#xff1b; 进入服务器BIOS系统&#xff1b; 启动方式选择DVD&#xff1b; 进入ESXI6.5.0安装程序&#xff1b; 等待安装程序载入&#xff1b; 敲回车&#xff1b; 敲回车&#xff1b; …

【token】一.token的作用;二.Express中实现token的方法

目录 一.token的作用&#xff1a; 1.控制表单的重复提交&#xff1a;在表单中加入隐藏的表单控件&#xff0c;在这个隐藏的表单控件中带上token字符串。 2.身份验证&#xff1a;用来验证向服务器发起请求&#xff08;请求服务器的资源&#xff09;的用户是否是合法的用户。经…

如何理解 CRUD 与 REST

全文 2070 字 阅读时间约 6 分钟 本文首发于码匠技术博客​​​​​​​ 目录 什么是 CRUD&#xff1f; CRUD 的发展简史 CRUD 规则 什么是 REST&#xff1f; REST 的发展简史 REST 规则 CRUD VS REST 关于码匠 CRUD 和 REST 是应用开发领域中两个比较常见的概念&…

解决安装Tensorflow2: ERROR annot determine archive format of XXX保存问题

安装命令报错&#xff1a; ERROR: Cannot unpack file C:\Users\lenovo\AppData\Local\Temp\pip-unpack-mdiptqlf\simple.html (downloaded from C:\Users\lenovo\AppData\Local\Temp\pip-req-build-oq32e170, content-type: text/html); cannot detect archive format解决方法…

Barra模型因子的构建及应用(一)

一、摘要 Barra模型可以追溯至1974年&#xff0c;美国学者Barr Rosenberg对投资组合的风险和收益进行分析的多因子风险模型。随后Rosenberg成立了Barra&#xff0c;并针对美国权益市场提出了Barra USE1模型&#xff0c;现在已更新到USE4&#xff1b;而针对中国权益市场提出的B…

微服务架构下的可观测性

微服务架构下的可观测性 一、服务可观测性 传统架构下排查问题传统项目在出现异常或性能问题时&#xff0c;通常都是基于系统日志文件来排查。而在微服务分布式部署架构下&#xff0c;日志文件随微服务分散存储&#xff0c;对于排查问题工作量很大。传统监控告警平台也仅针对平…

痞子衡嵌入式:探讨i.MXRT下FlexSPI driver实现Flash编程时对于中断支持问题

大家好&#xff0c;我是痞子衡&#xff0c;是正经搞技术的痞子。今天痞子衡给大家介绍的是i.MXRT下FlexSPI driver实现Flash编程时对于中断支持问题。 前段时间有客户在官方社区反映 i.MXRT1170 下使用官方 SDK 里 FlexSPI 驱动去擦写 Flash 时不能很好地支持全局中断。客户项目…

内核解读之内存管理(2)内存管理三级架构之内存结点node

文章目录0、概述1、内存节点node0、概述 结合NUMA的架构&#xff0c;Linux抽象出了三级内存管理架构&#xff1a;内存节点node、内存区域zone和物理页框page。 在NUMA模型中&#xff0c;每个CPU都有自己的本地内存节点&#xff08;memory node&#xff09;&#xff0c;而且还…

qq录制视频保存到哪了?如何更改qq录屏存储位置

一、查看qq录制视频保存位置如果有录制视频的需求&#xff0c;相信大部分人都是使用qq自带的录屏功能来录制视频。那qq录屏后的视频在哪里去找&#xff1f;今天就给大家分享如何查看qq录制完的视频保存位置操作方法&#xff1a;第一步&#xff1a;电脑上登录qq&#xff0c;在qq…

Cadence PCB仿真 使用Allegro PCB SI为BRD文件创建通用型IBIS模型的方法图文教程

⏪《上一篇》   🏡《总目录》   ⏩《下一篇》 1,概述 本文简单介绍使用Allegro PCB SI软件为BRD PCB设计文件中的元器件创建IBIS模型的方法。 2,创建方法 第1步:确定打开PCB文件的软件是 Allegro PCB SI 如果不是Allegro PCB SI,可执行File→Change Editor…更换软…

尚医通-查询所有子节点-前端整合-更新医院状态(二十三)

目录&#xff1a; &#xff08;1&#xff09;医院管理-查询所有子节点接口 &#xff08;2&#xff09;医院列表-前端整合 &#xff08;3&#xff09;更新医院上线状态-功能实现 &#xff08;1&#xff09;医院管理-查询所有子节点接口 先做一个省的查询 在DictContrlller…

数据可视化大屏百度地图GPS轨迹位置感知状态开发实战案例解析(包含缩放控件、点线覆盖物、弹窗、标注图标分类功能)

系列文章目录 1.数据可视化大屏应急管理综合指挥调度系统完整案例详解&#xff08;PHP-API、Echarts、百度地图&#xff09; 2.数据可视化大屏百度地图API开发&#xff1a;停车场分布标注和检索静态版 3.百度地图高级开发&#xff1a;map.getDistance计算多点之间的距离并输入…