Netty的解码器和编码器

news2025/1/1 23:44:01

链路图

一个完整的RPC请求中,netty对请求数据和响应数据的处理流程如下图所示

网络线路中传输的都是二进制数据,之后netty将二进制数据解码乘POJO对象,让客户端或者服务端程序处理。

解码的工具称为解码器,是一个入站处理器InBound。

编码的工具称为编码器,是一个处长处理器OutBound。

解码器

原理

解码器作为一个入站处理器,它需要将上一个入站处理器传过来的输入数据进行数据的编码或者格式转换,然后输出到下一站的入站处理器。

通常使用的ByteToMessageDecoder解码器将输入类型为ByteBuf缓冲区的数据进行解码,输出一个一个的POJO对象。

ByteToMessageDecoder是一个抽象类,继承关系如图

protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

ByteToMessageDecoder使用了模板模式,只定义了解码的流程,具体的解码逻辑由子类完成。也就是开放了decode解码方法,由具体的解码器实现。

重申一下Netty对于handler的管理是通过通道pipeline完成的,所以解码器后面的处理器可以是业务处理器。

业务处理器接收解码结果,进行业务处理。

解码器中有一个比较重要的实现是ReplayingDecoder(也是一个抽象类),它在读取ByteBuf缓冲区的数据之前,需要检查缓冲区是否有足够的字节,如果缓冲区中字节足够,则会正常读取,反之,则会停止解码。等待下一次IO时间到来时再读取。

ReplayingDecoder在内部定义了一个新的二进制缓冲区类,对ByteBuf缓冲区进行了修饰,也就是ReplayingDecoderBuffer。

也就是说,继承ReplayingDecoder的子类解码器收到的二进制数据是经过ReplayingDecoderBuffer修饰过,判断过的。不是直接读取的ByteBuf中的数据。

ReplayingDecoder除了对ByteBuf数组的修饰以外,另一个作用,也更重要的作用是做分包传输。

我们知道底层通信协议是分包传输的。也就是我们预期的包大小和顺序可能和实际的并不一样,这时候就可以通过ReplayingDecoder来处理,ReplayingDecoder通过state属性来控制状态变化。比如如下sock鉴权解码器

public class SocksAuthRequestDecoder extends ReplayingDecoder<State> {

    private String username;

    public SocksAuthRequestDecoder() {
        super(State.CHECK_PROTOCOL_VERSION);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
        switch (state()) {
            case CHECK_PROTOCOL_VERSION: {
                if (byteBuf.readByte() != SocksSubnegotiationVersion.AUTH_PASSWORD.byteValue()) {
                    out.add(SocksCommonUtils.UNKNOWN_SOCKS_REQUEST);
                    break;
                }
                checkpoint(State.READ_USERNAME);
            }
            case READ_USERNAME: {
                int fieldLength = byteBuf.readByte();
                username = SocksCommonUtils.readUsAscii(byteBuf, fieldLength);
                checkpoint(State.READ_PASSWORD);
            }
            case READ_PASSWORD: {
                int fieldLength = byteBuf.readByte();
                String password = SocksCommonUtils.readUsAscii(byteBuf, fieldLength);
                out.add(new SocksAuthRequest(username, password));
                break;
            }
            default: {
                throw new Error();
            }
        }
        ctx.pipeline().remove(this);
    }

    @UnstableApi
    public enum State {
        CHECK_PROTOCOL_VERSION,
        READ_USERNAME,
        READ_PASSWORD
    }
}

以上是偏分阶段解码,适用于那些固定长度的数据,比如整型等,但对于字符串来说,可长可短,没有具体的长度限制。如果用ReplayingDecoder来实现

@Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
        switch (state()) {
            case PARSE_1: {
                //基于Header-Content协议传输,Header中带有content长度,用一个int长度标识即可
                length = in.readInt();
                inBytes = new byte[];
                break;
            }
            case PARSE_2: {
                in.readBytes(inBytes,0,length);
                out.add(new String(inBytes,"UTF-8"));
            }
            default: {
                throw new Error();
            }
        }
        ctx.pipeline().remove(this);
    }

但其实对于比较复杂的业务场景中,不太建议使用ReplayingDecoder,主要原因是ReplayingDecoer在解析速度上相对较差,试想一下,replayingDecoder长度不够时,会停止解码。也就是说一个请求会被解码多次才可能最终完成。

对于字符串分包传输来说,更适合直接继承ByteToMessageDecoder基类来完成Header-Content协议的解析

@Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
        if(buf.readableBytes()<4){
            //可读字节小于4,消息头还没读满,返回。(假设Header是一个int的数据
            return;        
        }
        buf.markReaderIndex();
        int length = buf.readInt();
        if(buf.readableBytes()<length){
            buf.resetReaderIndex();        
        }
        byte[] inBytes = new byte[length];
        buf.readBytes(inBytes,0,length);
        out.add(new String(inBytes,"UTF-8"));
    }

除了ByteToMessageDecoder这种将二进制数据转化为POJO对象的解码器以外,还有将一种POJO转为另一种POJO对象的解码器,MessageToMessageDecoder,不同的是,后者需要指明泛型类型。比如Integer转为String,这时候泛型类型为Integer。

Netty内置的开箱即用的Decoder

FixedLengthFrameDecoder-固定长度数据包解码器

他会把入站ByteBuf数据包拆分成一个个长度为n的数据包,然后发往下一个channelHandler入站处理器

LineBasedFrameDecoder-行分割数据包解码器

如果ByteBuf数据包使用换行符/回车符作为数据包的边界分隔符。这时他会把数据包按换行符/回车符拆分成一个个数据包。

有一个行最大长度限制,如果超过这个长度还没有发现分隔符,会抛出异常

DelimiterFrameDecoder-自定义分隔符数据包解码器

他会按照自定义分隔符将ByteBuf数据包进行拆分

LengthFieldBasedFrameDecoder-自定义长度数据包解码器

基于灵活长度的数据包,在ByteBuf数据包中,加了一个长度字段,保存了原始数据包长度,解码的时候,会按照这个长度进行原始数据包的提取。

一般基于Header-Content协议的数据包,都建议使用这个解码器

public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {

   
    private final int maxFrameLength;        //发送的数据包最大长度
    private final int lengthFieldOffset;     //长度字段偏移量
    private final int lengthFieldLength;     //长度字段自己占用的字节数
    private final int lengthAdjustment;      //长度字段的偏移量矫正,比如长度后面还有两个字节用于存储别的信息,那么该值为2
    private final int initialBytesToStrip;   //丢弃的起始字节数
    ...
}

编码器

原理

所谓的编码器就是服务端应用程序处理完之后,一般会有一个响应结果Response。也就是一个Java POJO对象。需要将他编码为最终ByteBuf二进制类型。通过流水线写入到底层的Java通道。

上面说,解码器是一个入站处理器,那么编码器就是一个出站处理器。也就是OutboundHandler。处理逻辑为每个出站处理器会将上一个出站处理器的结果作为输入,经过处理后,传递给下一个出站处理器,直至最后写入Java通道。

由于出站处理器是从后向前执行的,所以第一个处理器一定是需要将结果处理成ByteBuf类型的数据。

MessageToByteEncoder同ByteToMessageDecoder一样都是一个抽象类,用模板模式。其中encode方法由子类实现。

在最后一步之前,可能会需要将一种POJO对象转成另一种POJO对象,就像解码器中的MessageToMessageDecoder一样,编码器也有同样的MessageToMessageEncoder解码器抽象类。

编解码器

所谓的编解码器也就是把解码器和编码器放在同一个类中,这个类就叫做ByteToMessageCodec,需要同时实现encode和decode方法。

不过这样的话,解码和编码的不同的代码就会出现在一个类中。出现逻辑混乱。Netty提供了另一种方式可以让编码代码和解码代码放在两个类,同时把编码工作和解码工作组合起来

编解码组合器

这个编解码组合器称为CombinedChanneldDuplexHandler组合器,比如客户端的编解码组合器就是用的这种方式

public final class HttpClientCodec extends CombinedChannelDuplexHandler<HttpResponseDecoder, HttpRequestEncoder>
        implements HttpClientUpgradeHandler.SourceCodec {
            
            ...
}

public class HttpResponseDecoder extends HttpObjectDecoder {
    ...
}

public abstract class HttpObjectDecoder extends ByteToMessageDecoder {
    
    private enum State {
        SKIP_CONTROL_CHARS,
        READ_INITIAL,
        READ_HEADER,
        READ_VARIABLE_LENGTH_CONTENT,
        READ_FIXED_LENGTH_CONTENT,
        READ_CHUNK_SIZE,
        READ_CHUNKED_CONTENT,
        READ_CHUNK_DELIMITER,
        READ_CHUNK_FOOTER,
        BAD_MESSAGE,
        UPGRADED
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
        if (resetRequested) {
            resetNow();
        }

        switch (currentState) {
        case SKIP_CONTROL_CHARS:
            // Fall-through
        case READ_INITIAL: try {
            AppendableCharSequence line = lineParser.parse(buffer);
            if (line == null) {
                return;
            }
            String[] initialLine = splitInitialLine(line);
            if (initialLine.length < 3) {
                // Invalid initial line - ignore.
                currentState = State.SKIP_CONTROL_CHARS;
                return;
            }

            message = createMessage(initialLine);
            currentState = State.READ_HEADER;
            // fall-through
        } catch (Exception e) {
            out.add(invalidMessage(buffer, e));
            return;
        }
        case READ_HEADER: try {
            State nextState = readHeaders(buffer);
            if (nextState == null) {
                return;
            }
            currentState = nextState;
            switch (nextState) {
            case SKIP_CONTROL_CHARS:
                // fast-path
                // No content is expected.
                out.add(message);
                out.add(LastHttpContent.EMPTY_LAST_CONTENT);
                resetNow();
                return;
            case READ_CHUNK_SIZE:
                if (!chunkedSupported) {
                    throw new IllegalArgumentException("Chunked messages not supported");
                }
                // Chunked encoding - generate HttpMessage first.  HttpChunks will follow.
                out.add(message);
                return;
            default:
                /**
                 * <a href="https://tools.ietf.org/html/rfc7230#section-3.3.3">RFC 7230, 3.3.3</a> states that if a
                 * request does not have either a transfer-encoding or a content-length header then the message body
                 * length is 0. However for a response the body length is the number of octets received prior to the
                 * server closing the connection. So we treat this as variable length chunked encoding.
                 */
                long contentLength = contentLength();
                if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {
                    out.add(message);
                    out.add(LastHttpContent.EMPTY_LAST_CONTENT);
                    resetNow();
                    return;
                }

                assert nextState == State.READ_FIXED_LENGTH_CONTENT ||
                        nextState == State.READ_VARIABLE_LENGTH_CONTENT;

                out.add(message);

                if (nextState == State.READ_FIXED_LENGTH_CONTENT) {
                    // chunkSize will be decreased as the READ_FIXED_LENGTH_CONTENT state reads data chunk by chunk.
                    chunkSize = contentLength;
                }

                // We return here, this forces decode to be called again where we will decode the content
                return;
            }
        } catch (Exception e) {
            out.add(invalidMessage(buffer, e));
            return;
        }
        case READ_VARIABLE_LENGTH_CONTENT: {
            // Keep reading data as a chunk until the end of connection is reached.
            int toRead = Math.min(buffer.readableBytes(), maxChunkSize);
            if (toRead > 0) {
                ByteBuf content = buffer.readRetainedSlice(toRead);
                out.add(new DefaultHttpContent(content));
            }
            return;
        }
        case READ_FIXED_LENGTH_CONTENT: {
            int readLimit = buffer.readableBytes();

            // Check if the buffer is readable first as we use the readable byte count
            // to create the HttpChunk. This is needed as otherwise we may end up with
            // create an HttpChunk instance that contains an empty buffer and so is
            // handled like it is the last HttpChunk.
            //
            // See https://github.com/netty/netty/issues/433
            if (readLimit == 0) {
                return;
            }

            int toRead = Math.min(readLimit, maxChunkSize);
            if (toRead > chunkSize) {
                toRead = (int) chunkSize;
            }
            ByteBuf content = buffer.readRetainedSlice(toRead);
            chunkSize -= toRead;

            if (chunkSize == 0) {
                // Read all content.
                out.add(new DefaultLastHttpContent(content, validateHeaders));
                resetNow();
            } else {
                out.add(new DefaultHttpContent(content));
            }
            return;
        }
        /**
         * everything else after this point takes care of reading chunked content. basically, read chunk size,
         * read chunk, read and ignore the CRLF and repeat until 0
         */
        case READ_CHUNK_SIZE: try {
            AppendableCharSequence line = lineParser.parse(buffer);
            if (line == null) {
                return;
            }
            int chunkSize = getChunkSize(line.toString());
            this.chunkSize = chunkSize;
            if (chunkSize == 0) {
                currentState = State.READ_CHUNK_FOOTER;
                return;
            }
            currentState = State.READ_CHUNKED_CONTENT;
            // fall-through
        } catch (Exception e) {
            out.add(invalidChunk(buffer, e));
            return;
        }
        case READ_CHUNKED_CONTENT: {
            assert chunkSize <= Integer.MAX_VALUE;
            int toRead = Math.min((int) chunkSize, maxChunkSize);
            if (!allowPartialChunks && buffer.readableBytes() < toRead) {
                return;
            }
            toRead = Math.min(toRead, buffer.readableBytes());
            if (toRead == 0) {
                return;
            }
            HttpContent chunk = new DefaultHttpContent(buffer.readRetainedSlice(toRead));
            chunkSize -= toRead;

            out.add(chunk);

            if (chunkSize != 0) {
                return;
            }
            currentState = State.READ_CHUNK_DELIMITER;
            // fall-through
        }
        case READ_CHUNK_DELIMITER: {
            final int wIdx = buffer.writerIndex();
            int rIdx = buffer.readerIndex();
            while (wIdx > rIdx) {
                byte next = buffer.getByte(rIdx++);
                if (next == HttpConstants.LF) {
                    currentState = State.READ_CHUNK_SIZE;
                    break;
                }
            }
            buffer.readerIndex(rIdx);
            return;
        }
        case READ_CHUNK_FOOTER: try {
            LastHttpContent trailer = readTrailingHeaders(buffer);
            if (trailer == null) {
                return;
            }
            out.add(trailer);
            resetNow();
            return;
        } catch (Exception e) {
            out.add(invalidChunk(buffer, e));
            return;
        }
        case BAD_MESSAGE: {
            // Keep discarding until disconnection.
            buffer.skipBytes(buffer.readableBytes());
            break;
        }
        case UPGRADED: {
            int readableBytes = buffer.readableBytes();
            if (readableBytes > 0) {
                // Keep on consuming as otherwise we may trigger an DecoderException,
                // other handler will replace this codec with the upgraded protocol codec to
                // take the traffic over at some point then.
                // See https://github.com/netty/netty/issues/2173
                out.add(buffer.readBytes(readableBytes));
            }
            break;
        }
        default:
            break;
        }
    }
    ...
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1408732.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分布式应用程序设计项目管理

1. 项目的定义 项目是一种特定的、新颖的行动&#xff0c;目的是以有条不紊、逐步的方式构建一个尚未存在确切对应物的未来现实。它是对精心制定的需求的回应&#xff0c;旨在满足业主的需要。项目包括一个可能是物理或智力的目标&#xff0c;并且需要使用给定的资源来执行一系…

Gold-YOLO(NeurIPS 2023)论文与代码解析

paper&#xff1a;Gold-YOLO: Efficient Object Detector via Gather-and-Distribute Mechanism official implementation&#xff1a;https://github.com/huawei-noah/Efficient-Computing/tree/master/Detection/Gold-YOLO 存在的问题 在过去几年里&#xff0c;YOLO系列已经…

某马头条——day10

热文章数据查询 分布式任务调度xxl-job 概述 环境搭建 docker化部署 docker run -p 3306:3306 --name mysql57 \ -v /opt/mysql/conf:/etc/mysql \ -v /opt/mysql/logs:/var/log/mysql \ -v /opt/mysql/data:/var/lib/mysql \ -e MYSQL_ROOT_PASSWORDroot\ -d mysql:5.7 dock…

uniapp开发过程一些小坑

问题1、uniapp使用scroll-view的:scroll-into-view“lastChatData“跳到某个元素id时候&#xff0c;在app上不生效&#xff0c;小程序没问题 使用this.$nextTick或者 setTimeout(()>{that.lastChatData 元素id },500) 进行延后处理就可以了。 问题2&#xff1a;uniapp开…

NE8实现HTTP Upgrade和HTTP CONNECT代理服务器

看到一个文章[Go] 不到 100 行代码实现一个支持 CONNECT 动词的 HTTP 服务器 在NET8中如何实现 创建项目为MiniApi 编辑Program.cs文件。 var builder WebApplication.CreateSlimBuilder(args);var app builder.Build();// 将HTTP请求通过协议升级机制转为远程TCP请求&…

02 分解质因子

一、数n的质因子分解 题目描述&#xff1a; 输入一个数n&#xff08;n<10^6&#xff09;,将数n分解质因数&#xff0c;并按照质因数从小到大的顺序输出每个质因数的底数和指数。 输入 5 输出 5 1 输入 10 输出 2 1 5 1 朴素解法&#xff1a; 首先求出1~n的所有质数…

༺༽༾ཊ—Unity之-02-简单工厂模式—ཏ༿༼༻

首先我们打开一个项目 在这个初始界面我们需要做一些准备工作 建基础通用包 创建一个Plane 重置后 缩放100倍 加一个颜色 任务&#xff1a;使用【简单工厂模式】生成四种不同怪物 【按不同路径移动】 首先资源商店下载四个怪物模型 接下来我们选取四个怪物作为预制体并分别起名…

Git 入门精讲

我们为什么要学习git&#xff1f; 就当下的发展而言&#xff0c;只要你从事开发就一定会接触git。作为最强大的分布式版本控制器&#xff0c;git 与 svn 有着本质上的区别。 Git是一种分布式版本控制系统&#xff0c;每个开发者都可以在本地维护完整的代码库&#xff0c;可以离…

第21课 在Android Native开发中架起java与c++互通的桥梁

在开始本节课&#xff0c;我尝试把项目拷贝到另一台电脑上以便继续工作&#xff0c;但出现了大量的“could not be resolved”问题&#xff0c;尝试包含新的include路径也无法解决该问题&#xff0c;最后删除了项目的Native Support&#xff0c;然后重新添加Native Support才解…

HarmonyOS鸿蒙学习基础篇 - Text文本组件

该组件从API Version 7开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。 Text文本组件是可以显示一段文本的组件。该组件从API Version 7开始支持&#xff0c;从API version 9开始&#xff0c;该接口支持在ArkTS卡片中使用。 子组件 可…

【时间序列篇】基于LSTM的序列分类-Pytorch实现 part2 自有数据集构建

系列文章目录 【时间序列篇】基于LSTM的序列分类-Pytorch实现 part1 案例复现 【时间序列篇】基于LSTM的序列分类-Pytorch实现 part2 自有数据集构建 【时间序列篇】基于LSTM的序列分类-Pytorch实现 part3 化为己用 在一个人体姿态估计的任务中&#xff0c;需要用深度学习模型…

上门服务小程序|预约上门服务系统开发有哪些功能?

在现代快节奏的生活中&#xff0c;压力和疲劳常常困扰着我们。为了缓解这种状况&#xff0c;越来越多的人选择去按摩店进行放松。然而&#xff0c;繁忙的工作和家庭责任往往让我们无法抽出时间去按摩店。在这种情况下&#xff0c;上门按摩服务应运而生。而随着科技的发展&#…

VI / VIM的使用

vi/vim 的区别简单点来说&#xff0c;它们都是多模式编辑器&#xff0c;不同的是 vim 是 vi 的升级版本&#xff0c;它不仅兼容 vi 的所有指令&#xff0c;而且 还有一些新的特性在里面。例如语法加亮&#xff0c;可视化操作不仅可以在终端运行&#xff0c;也可以运行于 x win…

SpringMVC第二天

今日内容 能够掌握SSM整合的流程 能够编写SSM整合功能模块类 能够使用Result统一表现层响应结果 能够编写异常处理器进行项目异常 能够完成SSM整合前端页面发送请求实现增删改查操作 能够编写拦截器并配置拦截器 一、SSM整合【重点】 1 SSM整合配置 问题导入 请描述“SSM整…

IntelliJ IDE 插件开发 | (五)VFS 与编辑器

系列文章 IntelliJ IDE 插件开发 |&#xff08;一&#xff09;快速入门IntelliJ IDE 插件开发 |&#xff08;二&#xff09;UI 界面与数据持久化IntelliJ IDE 插件开发 |&#xff08;三&#xff09;消息通知与事件监听IntelliJ IDE 插件开发 |&#xff08;四&#xff09;来查收…

Windows AD 组策略 通过脚本修改管理员密码:以安全方式

因为本文主要讲的是通过脚本如何以安全方式设置密码&#xff0c;所以关于组策略如何设置请参考这里&#xff1a; WinServer 2019 AD 组策略 启用本地管理员账号&#xff0c;重置密码_ad域命令启用administrator账户-CSDN博客 我们首先要讲一下&#xff0c;以一般方法创建的脚…

力扣1027. 最长等差数列

动态规划 思路&#xff1a; 可以参考力扣1218. 最长定差子序列目前不清楚公差&#xff0c;可以将序列最大最小值找到&#xff0c;公差的范围是 [-(max - min), (max - min)]&#xff0c;按公差递增迭代遍历求出最长等差数列&#xff1b; class Solution { public:int longest…

写一份简单的产品说明书:格式和排版建议

现在的市场竞争那么激烈&#xff0c;拥有一份简洁明了的产品说明书可以说是很重要的。产品说明书不仅向用户提供了对产品的详细了解&#xff0c;还能够树立品牌形象&#xff0c;提升用户体验。 | 一、写一份简单的产品说明书—一些建议 1.创意封面设计 一个吸引人的封面设计能…

浅析混沌工程的主要概念和作用

混沌工程是一种系统设计和测试方法&#xff0c;旨在通过有目的地在生产环境中引入故障来发现和解决系统中的潜在问题。通过模拟故障和持续测试&#xff0c;有助于发现和解决系统中的潜在问题&#xff0c;提高系统的可靠性、弹性和安全性。 故障引入&#xff1a; 混沌工程通过故…

for循环延时时间计算

​ 文章目录 前言一、计算方式二、for循环 2.1 for循环里定义变量2.2 codeblock设置C99标准 三、四、总结 前言 之前做led点亮的实验&#xff0c;好像是被delay函数影响了&#xff0c;因为delay参数设置的不对&#xff0c;led没有正常闪烁。现在就想搞明白一些。 一、计算…