netty中channelHandler实现原理及最佳实践|极客星球

news2025/1/12 6:45:54

为持续夯实MobTech袤博科技的数智技术创新能力和技术布道能力,本期极客星球邀请了企业服务研发部工程师梁立从 TCP 的粘包/半包、 Netty 处理粘包/半包及源码分析、 开源项目对 channelHandler最佳实践三方面对《netty 中channelHandler的原理与最佳实践》进行了全面的技术分享。

版本信息

本次分析版本基于netty 4.1.40.Final

TCP 的粘包/半包问题

在TCP/IP 协议传输网络数据包时,用户发送消息ABCD,服务端可能收到是ABCD. AB?CD?等。对于粘包问题,主要原因是发送方每次写入数据小于套接字缓冲区大小, 以及接受方读取消息不及时。对于半包问题, 主要原因是发送方每次写入数据大于套接字缓冲区大小,以及发送数据大于协议最大传输单位,底层需要拆包。那么针对此类问题,应当如何解决呢 ?常见的方式解码方式有三种:固定长度,使用固定分隔符来分割消息,以及固网长度字段存放内容长度信息。

解码实现思考

在分析之前,我们可以思考一下,如果是我们来实现上面三种编解码会如何实现 ?

我们可以整理如下需求:

1.我们需要存放我们解码好的消息;

2.我们需要提供一个解码方法来让不同子类实现, 例如固定长度,分隔符,以及固定长度字段解码的方式肯定有差别;

3.我们从套接字读取消息后就可以让我们解码器去处理了。

针对上述需求,我们还需要带着三个问题,查看源码看下是否和我们猜想的类似:

问题1:我们需要一个集合存放我们解码的消息;

问题2:我们需要不同子类对解码细节做不同实现,所以我们需要有一个父类;ByteToMessageDecoder, 可以在父类实现公共逻辑,提供给子类一个decode(List out,ByteBuf in); 方法;

问题3 :我们从套接字读取数据之后,发送一个读事件(fireChannelRead)让我们解码器去处理。

Netty 处理粘包/半包及源码分析

封帧方式

解码

固定长度

FixedLengthFrameDecoder

分隔符

DelimiterBasedFrameDecoder

固定长度字段存内容长度信息

LengthFieldBasedFrameDecoder

我们以固定长度解码器为例:

ServerBootstrap b = new ServerBootstrap();// ....b..childHandler(new ChannelInitializer<SocketChannel>() {    @Override    public void initChannel(SocketChannel ch) throws Exception {        ChannelPipeline p = ch.pipeline();        p.addLast(new FixedLengthFrameDecoder(2));        //.... 后续业务处理handler           }});public class FixedLengthFrameDecoder extends ByteToMessageDecoder {    //....}public class ByteToMessageDecoder {    // ....    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;}

我们查看 FixedLengthFrameDecoder ,发现果然继承父类ByteToMessageDecoder,然后父类也有一个channelRead方法处理消息,并提供一个decode抽象方法让子类实现。

channelRead

假设我们发送端发送ABCD消息,从套节字读取之后,后续会调用channelRead 方法进行解码。

我们看到获取一个集合实例CodecOutputList, 该类实现List接口。如果是首次调用,会把当前ByteBuf 赋值给cumulation,并调用callDecode(ctx, cumulation, out)。​​​​​​​

@Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        if (msg instanceof ByteBuf) {            CodecOutputList out = CodecOutputList.newInstance();            try {                ByteBuf data = (ByteBuf) msg;                first = cumulation == null;                if (first) {                    cumulation = data;                } else {                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);                }                callDecode(ctx, cumulation, out);            } catch (DecoderException e) {                throw e;            } catch (Exception e) {                throw new DecoderException(e);            } finally {              //.....            }        } else {            ctx.fireChannelRead(msg);        }    }

callDecode

通过字面意思就知道这个方法会做和解码相关操作。首先会判断in.isReadable() 是否可读,然后我们的outSize 目前是空, 进入到 decodeRemovalReentryProtection , 该方法会调用子类FixedLengthFrameDecoder的decode方法进行具体解码,该decode 方法比较简单就是当从ByteBuf 读取到指定长度就添加到out 中。我们读取完成后, outSize == out.size() 和 oldInputLength == in.readableBytes()都不满足,进入下一次循环, 我们outSize 大于0, 发送fireChannelRead。到此消息就被解码,并发送给我们业务channelHandler 。

 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {    try {        while (in.isReadable()) {            int outSize = out.size();            if (outSize > 0) {                fireChannelRead(ctx, out, outSize);                out.clear();                // Check if this handler was removed before continuing with decoding.                // If it was removed, it is not safe to continue to operate on the buffer.                //                // See:                // - https://github.com/netty/netty/issues/4635                if (ctx.isRemoved()) {                    break;                }                outSize = 0;            }            int oldInputLength = in.readableBytes();            //decode中时,不能执行完handler remove清理操作。            //那decode完之后需要清理数据。            decodeRemovalReentryProtection(ctx, in, out);            // Check if this handler was removed before continuing the loop.            // If it was removed, it is not safe to continue to operate on the buffer.            //            // See https://github.com/netty/netty/issues/1664            if (ctx.isRemoved()) {                break;            }            if (outSize == out.size()) {                if (oldInputLength == in.readableBytes()) {                    break;                } else {                    continue;                }            }            if (oldInputLength == in.readableBytes()) {                throw new DecoderException(                    StringUtil.simpleClassName(getClass()) +                    ".decode() did not read anything but decoded a message.");            }            if (isSingleDecode()) {                break;            }        }    } catch (DecoderException e) {        throw e;    } catch (Exception cause) {        throw new DecoderException(cause);    }}final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)    throws Exception {    decodeState = STATE_CALLING_CHILD_DECODE;    try {        decode(ctx, in, out);    } finally {        boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;        decodeState = STATE_INIT;        if (removePending) {            handlerRemoved(ctx);        }    }}public class FixedLengthFrameDecoder extends ByteToMessageDecoder {@Override    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {        Object decoded = decode(ctx, in);        if (decoded != null) {            out.add(decoded);        }    }        protected Object decode(            @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {        if (in.readableBytes() < frameLength) {            return null;        } else {            return in.readRetainedSlice(frameLength);        }    }}

channelHandler 的最佳实践

了解Netty 的小伙伴都知道channelHandler 分为ChannelInboundHandler 和 ChannelOutboundHandler, 分别用来处理inbound 和 outbound。

channelHandler 的最佳实践本质就是inbound 和outbound 的最佳实践。

下面列举了三种具有代表性的场景

• 按照职责划分channelHandler,例如有处理编解码,有处理心跳的,有专门处理业务的;

• 因为channel和eventLoop 线程绑定,然后一个evnetLoop 可能服务多个channel,所以我们不要在channelHandler 做耗时操作;

• outbound 我们可以优化写,减少系统调用。

按照职责划分channelHandler

rocketMq 

我们可以查看rocketMq 是如何划分channelHandler , 比如具有专门处理编/解码的NettyEncoder/NettyDecoder,通过IdleStatHandler 发现不活跃连接,管理连接handlerNettyConnectManageHandler 进行处理,

业务处理 NettyServerHandler 。

dubbo

处理编解码,检查不活跃channel,以及业务处理handler。

不在channelHandler 做耗时操作

之前介绍过一个eventLoop 线程服务多个channel,假设某个channelHandler处理耗时的任务,会影响其他channel,所以我们不要在channelHandler 执行耗时操作。

如果确实需要执行耗时操作,我们可以给channelHandler 添加一个线程池处理

​​​​​​​

final DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();// 为我们的serverHandler 添加单独的线程池处理事件。pipeline.addLast(defaultEventLoopGroup,serverHandler);

outbound 优化写

writeAndFlush存在的问题

我们来看一下下面代码有什么问题?

public class EchoServerHandlerextends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {        ctx.writeAndFlush(msg);    }}

代码的问题在于ctx.writeAndFlush 每次调用都会触发一次系统调用。然后channelRead 在一次业务处理中可能被调用多次,问题就变为一次业务请求,执行多次系统调用。

优化writeAndFlush

怎么优化?

我们可以重写channelRead 和 channelReadComplete,在channelRead 中调用write 方法,

在channelReadComplete中调用flush 方法 。​​​​​​​

public class EchoServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {        ctx.write(msg);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) {        ctx.flush();    }}

上面的实现方式确实减少系统调用,但是在netty 内部当有数据可读,会默认会连续16次,最后在调用channelReadComplete() 方法。

默认的行为存在两个问题:

1.写出数据到对端的时间被延迟了;

2.默认16 次这个数据不一定适合所有业务场景(不够灵活)。

我们需要结合业务的特性,例如业务如果关注吞吐量,可以适当把读取几次后刷新设置的大一些。如果业务关注及时性,读取几次后刷新就适当设置小一点。基于上述需求,FlushConsolidationHandler 就诞生了, 可以指定读取几次后刷新一次。

FlushConsolidationHandler 优化写

使用在pipeline中添加FlushConsolidationHandler,读取几次刷新一次可以根据业务设置,例如这里设置5次,我们是优化 EchoServerHandler的写,就放在它的前面。

// 每5次就触发一次flush  // ....p.addLast(new FlushConsolidationHandler(5));p.addLast(new EchoServerHandler());// ....public class EchoServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {        ctx.writeAndFlush(msg);    }}

 原理分析:

首先FlushConsolidationHandler 继承 ChannelDuplexHandler,能同时处理入站和出站消息,

入站我们查看 channelRead 和 channelReadComplete 实现,出站我们查看 flush 方法 (没有对write方法进行重写)。

channelRead

• 设置readInProgress 就把事件向下传递

• 我们的EchoServerHandler 会channelRead 会被调用,我们在channelRead 中调用ctx.writeAndFlush。

• 触发write 和 flush 的出站消息, FlushConsolidationHandler的flush进行处理

• 先判断readInProgress, ++flushPendingCount == explicitFlushAfterFlushes 判断是否达到期望刷新次数,我们设置为5 ,不执行刷新。

• 接着channelReadComplete 被调用,会重置准备刷新次数,并执行刷新。

关键就在channelRead 和 channelReadComplete

假设我们channelRead 读取了多次, 当读取次数大于等于5次就会刷新,小于5次时由channelReadComplete 刷新。

这样就达到了减少系统调用并且每读取几次在刷新也可以配置

public class FlushConsolidationHandler extends ChannelDuplexHandler {    // explicitFlushAfterFlushes 表示几次flush后,才真正调用flush 方法    // consolidateWhenNoReadInProgress 支持异步的情况,当readInProgress不为true 也可以支持flush    public FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress){        //....    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        readInProgress = true;        ctx.fireChannelRead(msg);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        // This may be the last event in the read loop, so flush now!        // 内部就是将 readInProgress = false; 当flushPendingCount 就调用flush        resetReadAndFlushIfNeeded(ctx);        ctx.fireChannelReadComplete();    }    @Override    public void flush(ChannelHandlerContext ctx) throws Exception {        //根据业务线程是否复用IO线程两种情况来考虑:        //复用情况        if (readInProgress) { //正在读的时候            // If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus            // we only need to flush if we reach the explicitFlushAfterFlushes limit.            //每explicitFlushAfterFlushes个“批量”写(flush)一次            //不足怎么办?channelReadComplete会flush掉后面的            if (++flushPendingCount == explicitFlushAfterFlushes) {                flushNow(ctx);            }            //以下是非复用情况:异步情况        } else if (consolidateWhenNoReadInProgress) {            //(业务异步化情况下)开启consolidateWhenNoReadInProgress时,优化flush            //(比如没有读请求了,但是内部还是忙的团团转,没有消化的时候,所以还是会写响应)            // Flush immediately if we reach the threshold, otherwise schedule            if (++flushPendingCount == explicitFlushAfterFlushes) {                flushNow(ctx);            } else {                scheduleFlush(ctx);            }        } else {            //(业务异步化情况下)没有开启consolidateWhenNoReadInProgress时,直接flush            // Always flush directly            flushNow(ctx);        }    }}

附录

默认读取16次设置入口源码分析

默认创建DefaultChannelConfig ,会接着调用重载的构造函数。

在setRecvByteBufAllocator可以看到获取metadata.defaultMaxMessagesPerRead()。

而ChannelMetadata 默认构造为 16次 new ChannelMetadata(false, 16)。

public abstract class AbstractNioByteChannel extends AbstractNioChannel {        private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);    //.....} // 默认选择自适应接受缓存分配器,然后在调用setRecvByteBufAllocator。// setRecvByteBufAllocator就是指定最大读取多少次的入口 ,默认为16次 public class DefaultChannelConfig implements ChannelConfig {     public DefaultChannelConfig(Channel channel) {        //除UDP外都默认选择自适应接受缓存分配器        this(channel, new AdaptiveRecvByteBufAllocator());    }      protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {        //UDP的使用固定SIZE的接受缓存分配器:FixedRecvByteBufAllocator        setRecvByteBufAllocator(allocator, channel.metadata());        this.channel = channel;    }} private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {    if (allocator instanceof MaxMessagesRecvByteBufAllocator) {        ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());    } else if (allocator == null) {        throw new NullPointerException("allocator");    }    setRecvByteBufAllocator(allocator);}  public final class ChannelMetadata {     private final boolean hasDisconnect;    private final int defaultMaxMessagesPerRead;    // ....  }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/141698.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Ctfer训练计划】——(九)

作者名&#xff1a;Demo不是emo 主页面链接&#xff1a;主页传送门创作初心&#xff1a;舞台再大&#xff0c;你不上台&#xff0c;永远是观众&#xff0c;没人会关心你努不努力&#xff0c;摔的痛不痛&#xff0c;他们只会看你最后站在什么位置&#xff0c;然后羡慕或鄙夷座右…

Python+Selenium4元素定位_web自动化(3)

目录 0. 上节回顾 1. 八大定位 2. 定位器 3. CSS选择器 4. XPath选择器 4.1. XPath语法 4.2. XPath 函数 5. 相对定位 5.1 XPath 中的相对定位【重点】 5.1.1 相对路径 5.1.2 轴 5.2 selenium4 中的相对定位 总结 0. 上节回顾 浏览器的一般操作 浏览器的高级操作…

【sciter】:JSX 组件实现数据持久化

# 原理 组件数据持久化指的是:重新加载组件后,能否将重新加载前组件所存在的数据,在重新加载后数据依旧保存在组件中。 组件数据持久化实现原理:将每次更新组件数据同步到 Storage 中。并且监听组件重新加载(刷新),在刷新前将 Storage 关闭(确保数据不丢失)。当加载…

idea中添加git使用时文件不同颜色,标签不同颜色,代码不同颜色代表的含义

文章目录文件的颜色标签的颜色合并代码时不同颜色区块的含义文件的颜色 绿色——已经加入控制暂未提交&#xff1b; 红色——未加入版本控制&#xff1b;自己建立新文件后就是红色的&#xff0c;出现红色的一定要Add到git中&#xff0c;不然不能上传到远程仓库 蓝色——加入&am…

关于markdown相关语法的学习

众所周知&#xff0c;一个好的项目需要搭配一个好的项目说明&#xff0c;就行吃饺子需要蘸醋一样&#xff0c;没有醋的饺子&#xff0c;你仅仅吃掉了他的肉体&#xff0c;而得不到他的灵魂。下面开始吃饺子&#xff0c;不对&#xff0c;是开始学习markdown文件的基础语法&#…

在采购管理过程中使用技术有什么好处?

采购过程不总是简单直接的&#xff0c;人工采购过程非常耗费人力和时间&#xff0c;并且涉及大量文书工作。另一方面&#xff0c;当你在采购过程中使用技术时&#xff0c;比如使用SRM采购管理系统&#xff0c;会节省很多时间&#xff0c;使整个过程变得更加简单和轻松。 在讨…

Homekit智能家居创意DIY之智能吸顶灯

买灯要看什么因素 好灯具的灯光可以说是家居的“魔术师”&#xff0c;除了实用的照明功能外&#xff0c;对细节的把控也非常到位。那么该如何选到一款各方面合适的灯呢&#xff1f; 照度 可以简单理解为清晰度&#xff0c;复杂点套公式来说照度光通量&#xff08;亮度&#…

【达梦8】vm 虚拟机centos 7 安装达梦8 数据库

目录准备下载安装包版本选择安装前准备【登录root用户】创建用户【登录root用户】设置限制资源配置【登录dmdba用户】上传iso文件挂载iso创建安装路径开始安装【登录dmdba用户】安装【登录dmdba用户】初始化实例初始化注意事项开始初始化启动数据库启动方式1 &#xff08;推荐&…

Linux oom机制

Linux oom机制前言1 内存回收2 OOM基本原理2.1 虚拟内存OOM2.2 物理内存OOM3 oom配置参数3.1 panic_on_oom3.2 oom_kill_allocating_task3.3 oom_dump_tasks4 安卓LMK简介5 总结前言 Linux oom是由于内存泄漏或者内存使用不合理而导致的问题。 在讲OOM之前&#xff0c;我们先…

数据库,计算机网络、操作系统刷题笔记25

数据库&#xff0c;计算机网络、操作系统刷题笔记25 2022找工作是学历、能力和运气的超强结合体&#xff0c;遇到寒冬&#xff0c;大厂不招人&#xff0c;可能很多算法学生都得去找开发&#xff0c;测开 测开的话&#xff0c;你就得学数据库&#xff0c;sql&#xff0c;oracle…

Java实现栈结构

目录 一、栈概述 二、模拟实现栈 1、入栈 2、出栈 3、取栈顶元素 三、栈的应用 1、逆序打印链表 2、括号匹配问题 3、逆波兰表达式求值 4、栈的压入、弹出序列 5、最小栈 一、栈概述 栈&#xff08;Stack&#xff09;也是数据结构的一种&#xff0c;属于线性数…

Usaco Training刷怪旅 第三层 第五题:Wormholes

美国人出的题真的难&#xff08;个人感觉&#xff09;&#xff0c;看起来还行&#xff0c;做起来就是另外一会儿事儿了 Farmer Johns hobby of conducting high-energy physics experiments on weekends has backfired, causing N wormholes (2 < N < 12, N even) to ma…

基于物体基元空间几何特征的移动激光雷达点云街道树提取与分割

paper题目&#xff1a;Street Tree Extraction and Segmentation from Mobile LiDAR Point Clouds Based on Spatial Geometric Features of Object Primitives Abstract 从移动光探测与测距(LiDAR)点云中提取行道树仍然面临挑战&#xff0c;如在复杂的城市环境中提取精度低、…

浅谈哨兵机制的原理

文章目录哨兵机制的基本流程监控:主观下线&#xff1a;客观下线&#xff1a;选主&#xff1a;筛选&#xff1a;打分&#xff1a;通知&#xff1a;哨兵机制的基本流程 哨兵其实就是一个运行在特殊模式下的 Redis 进程&#xff0c;主从库实例运行的同时&#xff0c;它也在运行。…

el-input中放入elbutton

如图&#xff0c;如何在element组建的el-input的后缀放一个可点击的按钮或者标签 <el-input><el-button style"padding-right:10px" slot"suffix" type"text" >选择</el-button></el-input>在el-input的官网介绍中&…

jina实现并发扩展的调研

基于之前对于clip-as-service的调研&#xff0c;我在官方文档中看到横向扩展页面中的副本相关内容&#xff0c;可以解决并发问题&#xff0c;于是动手验证了一番 参考链接 &#xff08;官方文档&#xff09;link 官方文档的描述 首先我整了一个服务端 如果需要开启副本&…

线段相交判断

一、问题描述已知两条线段P1P2和Q1Q2&#xff0c;判断P1P2和Q1Q2是否相交&#xff0c;若相交&#xff0c;求出交点。两条线段的位置关系可以分为三类&#xff1a;[1] 有重合部分;[2] 无重合部分但有交点;[3] 无交点。注意&#xff1a;这里讨论的是两条线段是否相交&#xff0c;…

典型相关分析(附SPSS操作)

典型相关分析&#xff1a;研究两组变量&#xff08;每个变量中都可能有多个指标&#xff09;之间相关关系的一种多元统计方法。他能够揭示出两组变量之间的内在联系。选能较为综合、全面的衡量所在组的内在规律。一组变量最简单的综合形式就是该组变量的线性组合。典型相关分析…

还在用破-解版Navicat?有款纯Web化SQL开发工具,免安装还免费

经常使用SQL工具的开发者对Navicat一定都不陌生。这款软件作为一款全球化的多数据库管理工具&#xff0c;这些年逐步得到全国各地SQLer&#xff08;SQL开发者&#xff09;的关注。 与其他很多外来的软件产品一样&#xff0c;由于价格原因&#xff0c;很多SQLer感觉不太适合适应…

Maven(通用结构,集合了测试、打包、发布功能为一体)

Maven基础&#xff1a; 作用&#xff1a; 1.提供一套标准化的项目结构&#xff08;用于例如idea导入到eclipse或其他软件中&#xff0c;项目结构不会紊乱&#xff09; 2.提供一套标准化构建流程&#xff08;编译、测试、打包、发布...&#xff09;&#xff08;右键Maven-run…