Netty代码阅读

news2024/9/24 5:25:57

阅读Netty官方文档的时候,提到了Netty主要有三大核心,分别是buffer、channel、Event Model,接下来我们就从阅读Netty代码来理解这三大核心。

示例程序

先给出示例程序,方便自己也方便读者进行debug调试。

Server端代码

# Server.java文件

package org.example;

public class Server {

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new DiscardServer(port).run(); // ref-1
    }

}

ref-1处的代码创建的DiscardServer对象如下所示。

// DiscardServer.java文件
package org.example;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class DiscardServer {

    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeEncoder(), new TimeServerHandler()); // ref-2
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)          
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync();

            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

}

ref-2处会将ChannelHandler的实现类TimeEncoder和TimeServerHandler的对象添加到pipeline的最后位置。这个两个类的代码如下所示:

// TimeEncoder.java
package org.example;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {

    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value()); // ref-3
    }

}
// TimeServerHandler.java文件
package org.example;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ChannelFuture f = ctx.writeAndFlush(new UnixTime());
        f.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

ref-3处代码就是将数据写入到ByteBuf中,后文会详细讲解。

Client端代码

package org.example;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {

    public static void main(String[] args) throws Exception {
        String host = "127.0.0.1";
        int port = 8080;
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap(); 
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true); 
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler()); // ref-4
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); 

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

}

ref-4处会将ChannelHandler的实现类TimeDecoder和TimeClientHandler的对象注册到pipeline的最后处,这两个类的实现代码如下所示:

package org.example;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class TimeDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        if (in.readableBytes() < 4) {
            return;
        }

        out.add(new UnixTime(in.readUnsignedInt())); // ref-5
    }
}

package org.example;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        UnixTime m = (UnixTime) msg;
        System.out.println(m);
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

ref-5处的代码会从ByteBuf中读取数据,详细内容后文会解析。

运行Server和Client

先运行Server的main方法,然后运行Client的main方法,会得到如下的输出:

Sat Aug 24 15:06:11 CST 2024

ByteBuf解析

写入数据到ByteBuf

ref-3代码会向ByteBuf写入一个Int类型的数据,先看一下在抽象类ByteBuf中的申明:

// io.netty.buffer.ByteBuf.java文件
/**
 * Sets the specified 32-bit integer at the current {@code writerIndex}
 * and increases the {@code writerIndex} by {@code 4} in this buffer.
 * If {@code this.writableBytes} is less than {@code 4}, {@link #ensureWritable(int)}
 * will be called in an attempt to expand capacity to accommodate.
 */
public abstract ByteBuf writeInt(int value);

这个方法的大意就是会将32位的整数设置到当前写指针(writerIndex)的位置,并且将writerIndex增加4。如果可以写的空间少于4,那么就会调用ensureWritable(int)方法尝试扩大容量以容纳32位整数数据。

然后我们在看一下具体的实现:

// io.netty.buffer.AbstractByteBuf.java文件
@Override
public ByteBuf writeInt(int value) {
    ensureWritable0(4);
    _setInt(writerIndex, value);
    writerIndex += 4;
    return this;
}

在实现类中,这个写入32位整数的代码就是在完成上述申明中的步骤。先调用ensureWritable0(4)确保有足够的空间写入32位整数,然后调用_setInt(writerIndex, value)执行写入操作,最后将writerIndex增加4。

接下来我们追一下写入数据的步骤,如下所示:

// io.netty.buffer.PooledUnsafeDirectByteBuf.java文件
@Override
protected void _setInt(int index, int value) {
    UnsafeByteBufUtil.setInt(addr(index), value); // ref-6
}

继续跟一下,可以发现是调用了Java的Unsafe方法:

// io.netty.buffer.UnsafeByteBufUtil.java文件
static void setInt(long address, int value) {
    if (UNALIGNED) {
        PlatformDependent.putInt(address, BIG_ENDIAN_NATIVE_ORDER ? value : Integer.reverseBytes(value));
    } else {
        PlatformDependent.putByte(address, (byte) (value >>> 24));
        PlatformDependent.putByte(address + 1, (byte) (value >>> 16));
        PlatformDependent.putByte(address + 2, (byte) (value >>> 8));
        PlatformDependent.putByte(address + 3, (byte) value);
    }
}
// io.netty.util.internal.PlatformDependent.java文件
public static void putInt(long address, int value) {
    PlatformDependent0.putInt(address, value);
}
// io.netty.util.internal.PlatformDependent0.java文件
static void putInt(long address, int value) {
    UNSAFE.putInt(address, value);
}

我们看一下Unsafe类的说明,直接上jdk文档内容:


/**
 * A collection of methods for performing low-level, unsafe operations.
 * Although the class and all methods are public, use of this class is
 * limited because only trusted code can obtain instances of it.
 *
 * <em>Note:</em> It is the responsibility of the caller to make sure
 * arguments are checked before methods of this class are
 * called. While some rudimentary checks are performed on the input,
 * the checks are best effort and when performance is an overriding
 * priority, as when methods of this class are optimized by the
 * runtime compiler, some or all checks (if any) may be elided. Hence,
 * the caller must not rely on the checks and corresponding
 * exceptions!
 *
 * @author John R. Rose
 * @see #getUnsafe
 */

public final class Unsafe {
    ......
}

第一句话就说明了,这个类提供了一系列方法来执行底层的、不安全的操作。简单点说,就是这个类直接操作的内存。

ref-6 处有个细节,就是计算地址的方法调用addr(index),我们下面详细看一下:

// io.netty.buffer.PooledUnsafeDirectByteBuf.java文件
private long addr(int index) {
    return memoryAddress + index;
}

计算地址就是起始地址加一个偏移量index,这个index就是我们在上层传递的writerIndex。这儿就体现了写指针的作用,它就是记录数据已经写到哪个位置了,下一次写数据就从这个位置开始写。

从ByteBuf读取数据

写入数据分析完了,我们再分析一下读取数据。ref-5处的代码就是在从ByteBuf中读取数据in.readUnsignedInt(),我们先看一下这个方法的申明。

// io.netty.buffer.ByteBuf.java文件
/**
     * Gets an unsigned 32-bit integer at the current {@code readerIndex}
     * and increases the {@code readerIndex} by {@code 4} in this buffer.
     *
     * @throws IndexOutOfBoundsException
     *         if {@code this.readableBytes} is less than {@code 4}
     */
public abstract long  readUnsignedInt();

这个方法会在readerIndex位置读取32位的整数,然后将readerIndex增加4。

我们再看一下具体实现:

// 会进入到io.netty.buffer.AbstractByteBuf.java中的这个方法。
@Override
public int readInt() {
    checkReadableBytes0(4);
    int v = _getInt(readerIndex);
    readerIndex += 4;
    return v;
}

接下来看看_getInt(readerIndex)方法的调用:

// io.netty.buffer.PooledUnsafeDirectByteBuf.java
@Override
protected int _getInt(int index) {
    return UnsafeByteBufUtil.getInt(addr(index));
}

这个方法是不是很熟悉啊,和写入数据一样,都是先计算地址,再进行操作,底层也是依赖的Unsafe类。

到这儿也能体现出来readerIndex的作用了,它就是记录读取数据到哪儿了,然后下一次读取的时候就从readerIndex开始读取。

ByteBuf总结

结合ByteBuf类上的注释,对它进行一个总结。ByteBuf是底层byte数组或者java NIO Buffer的一个视图,它维护了两个指针,分别是读指针(readerIndex)和写指针(writerIndex),这两个指针分别记录读取和写入数据的位置。

具体示意图如下:

       +-------------------+------------------+------------------+
       | discardable bytes |  readable bytes  |  writable bytes  |
       |                   |     (CONTENT)    |                  |
       +-------------------+------------------+------------------+
       |                   |                  |                  |
       0      <=      readerIndex   <=   writerIndex    <=    capacity

这两个指针将对应byte数组分成了三个区域。readable bytes区域是实际存储数据的区域,writable bytes是需要填充的未定义区域,discardable bytes区域包含的是已经被读操作获取了的数据。

Channel解析

接下来我们看一下核心组件Channel,它代表的是与Socket或者有能力进行I/O操作的组件的连结,比如读、写、连接或者绑定。

Channel为用户提供如下能力:

  • 获取channel的当前状态。
  • 获取Channel的配置参数。
  • Channel支持的I/O操作。
  • ChannelPipeling会处理和channel相关的所有I/O事件和请求。

由于使用Netty时并不直接使用Channel,所以对于Channel的理解,目前就到这儿。

Event Model解析

用下面的图对Netty事件模型进行一个总结:

在这里插入图片描述

Java NIO负责处理客户端的请求,每来一个请求就会创建一个channel进行处理。

channel会附带一个channelPipeline,里面有添加的ChannelHandler。

EventLoop其实就是一个被封装了的线程池,ChannelHandler的执行就是在EventLoop中的线程上完成的。

总结

自己的水平有限,对于Netty的源码就只能分析到这儿了。

做个简单的总结,Netty底层是基于Java NIO的,在其上创造了三个重要的概念,(1)Channel,接收客户端请求的通道;(2)ByteBuf 对底层内存进行直接操作的缓冲区;(3)Event Model,主要是EventLoop对线程池的封装,还有对各个生命周期函数的调用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2071175.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mysql启动报错“本地计算机上的MySQL服务启动后停止。某些服务在未由其他服务或程序使用时将自动停止”

我删除&#xff08;手动删除&#xff09;完 binlog 文件后&#xff0c;重新启动mysql服务报错如下&#xff1a; 查看错误日志可以看到 某个 binlog 文件找不到 打开 binlog.index 可以看到里面引用的正是这个文件 解决方法&#xff1a; 要么手动修改 binlog.index 文件&#…

【C++ Primer Plus习题】4.6

问题: 解答: #include <iostream> using namespace std;typedef struct _CandyBar {string brand;float weight;int calorie; }CandyBar;int main() {CandyBar snack[3] { {"德芙",2.1,20},{"箭牌",2.2,16},{"阿尔卑斯",2.3,18}};for (i…

【GNSS接收机】开源导航接收机

Pocket SDR Pocket SDR是一款基于软件无线电&#xff08;SDR&#xff09;技术的开源GNSS&#xff08;全球导航卫星系统&#xff09;接收机。它由名为“Pocket SDR FE”的RF前端设备、设备的一些实用程序以及用Python、C和C编写的GNSS-SDR AP&#xff08;应用程序&#xff09;组…

linux死锁问题和cpu使用率过高问题排查

1、问题共同点 死锁问题和cpu使用率过高都是需要我们找出对应的问题线程。 死锁问题需要我们找出哪两个线程出现了死锁情况。 cpu使用率过高需要我们找出哪个或哪些线程占用了大量的cpu。 2、命令排查 2.1、查看机器上的Java进程 jcmd或 jps2.2、查看对应Java进程的线程级别…

全文发布|SmartX 金融行业跑批类业务场景探索与实践合集

经过多年在⾦融⾏业的积累和发展&#xff0c;SmartX 已经赢得了 300 多家⾦融⽤户的信任。覆盖了银⾏、保险、证券、基⾦、期货和信托等主要⾦融细分领域。在这个过程中&#xff0c;我们从最初的单⼀超融合⼚商&#xff08;⼩规模起步/快速交付/按需灵活扩容/降低总拥有成本&am…

【Hot100】LeetCode—236. 二叉树的最近公共祖先

目录 1- 思路递归 自底向上 2- 实现⭐236. 二叉树的最近公共祖先——题解思路 3- ACM 实现 题目连接&#xff1a;236. 二叉树的最近公共祖先 1- 思路 递归 自底向上 ① 自底向上的逻辑的话 需要采用后续遍历的方式&#xff0c;最后处理中间结点 ② 递归 2.1 参数和返回值…

Verilog刷题笔记60

题目&#xff1a; Exams/2013 q2bfsm Consider a finite state machine that is used to control some type of motor. The FSM has inputs x and y, which come from the motor, and produces outputs f and g, which control the motor. There is also a clock input called …

openGL文本渲染FreeType常见问题

这里写自定义目录标题 源码下载及编译编译生成的dll及lib使用FreeTypeinclude头文件加载附加包含目录 lib文件加载添加lib文件位置添加lib文件下的lib名 字体使用代码编写代码初始化中文字体输出简单封装 存在问题列表问题1&#xff1a;无法打开stddef.h其他问题后续更新 源码下…

[JAVA]初识线程池及其基本应用

并发是伴随着多核处理器的诞生而产生的&#xff0c;为了充分利用硬件资源&#xff0c;诞生了多线程技术。但是多线程又存在资源竞争的问题&#xff0c;引发了同步和互斥的问题&#xff0c;JDK1.5推出的java.util.concurrent(并发工具包来解决这些问题) 在Java并发包中一个最核心…

光影漫游者:展览空间的新选择—轻空间

在展览业迅速发展的今天&#xff0c;展览空间的设计和应用方式也在不断进化。光影漫游者作为一种创新的移动展馆&#xff0c;凭借其卓越的性能和灵活的应用方式&#xff0c;迅速崭露头角。与传统建筑展示空间和其他类型的移动展馆相比&#xff0c;光影漫游者展现出了多项显著优…

(javaweb)springboot的底层原理

目录 一.配置优先级 二.Bean的管理 1.获取bean ​编辑​编辑 2.bean作用域 3.第三方bean 三.SpringBoot原理 自动配置原理 原理分析&#xff1a; conditional&#xff1a; 自动配置案例&#xff1a;&#xff08;自定义starter分析&#xff09; 总结 一.配置优先级 //命…

leetcode 891. Sum of Subsequence Widths

原题链接 The width of a sequence is the difference between the maximum and minimum elements in the sequence. Given an array of integers nums, return the sum of the widths of all the non-empty subsequences of nums. Since the answer may be very large, retu…

Linux--NAT,代理服务,内网穿透

目录 1.NAT 技术 NAT IP转换过程 2. NAPT&#xff08;NAT转化表&#xff09; NAT 技术的缺陷 3.内网穿透&&内网打洞 内网穿透 内网打洞 两者差别 4.代理服务器 正向代理 反向代理 NAT 和代理服务器 1.NAT 技术 之前我们讨论了, IPv4 协议中, IP 地址数量不…

yd云手机登录算法分析

yd云手机登录算法分析 yd云手机登录算法分析第一步&#xff1a;抓包-登录第二步&#xff1a;定位加密入口第三步&#xff1a;分析加密算法第四步&#xff1a;算法实现 yd云手机登录算法分析 在这篇文章中&#xff0c;我们将详细解析yd云手机的登录算法&#xff0c;涵盖从抓包到…

SpringBoot整合Mybatis,Junit (复现之前写的一个SSM项目)

引言 如下是之前写的一个SSM项目&#xff08;纯注解版&#xff09;&#xff0c;现在我们要把它改造成一个SpringBoot项目&#xff0c;以体现SpringBoot的方便。主要需要关注的文件已经用红框标出。 1.config文件夹里面的是Spring&#xff0c;SpringMvc&#xff0c;Mybatis的配…

机械学习—零基础学习日志(如何理解概率论7)

这里需要先理解伯努利试验。只有A与A逆&#xff0c;两种结果。 正态分布 再来一道习题~&#xff1a; 解析&#xff1a; 《概率论与数理统计期末不挂科|考研零基础入门4小时完整版&#xff08;王志超&#xff09;》学习笔记 王志超老师 &#xff08;UP主&#xff09;

【Python学习手册(第四版)】学习笔记20.1-迭代和解析(二)-生成器函数、表达式详解

个人总结难免疏漏&#xff0c;请多包涵。更多内容请查看原文。本文以及学习笔记系列仅用于个人学习、研究交流。 本文会回顾迭代、列表解析、map&#xff0c;新学习生成器函数及其相关的生成器函数、表达式——这是用户定义的、按需产生结果的方式。较简单。 目录 回顾列表解…

软考-软件设计师(数据结构习题一)

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 非常期待和您一起在这个小…

勇夺三项SOTA!北航爱诗科技联合发布灵活高效可控视频生成方法TrackGo!

论文链接&#xff1a;https://arxiv.org/pdf/2408.11475 项目链接&#xff1a;https://zhtjtcz.github.io/TrackGo-Page/ ★ 亮点直击 本文引入了一种新颖的运动可控视频生成方法&#xff0c;称为TrackGo。该方法为用户提供了一种灵活的运动控制机制&#xff0c;通过结合 masks…

qt-内置图片遍历-Lambda按钮

内置图片遍历-Lambda按钮 知识点widget.hwidget.cppmain.cpp运行图 知识点 使用新的connect语法连接信号和槽 --Lambda 使用 connect(btn, &QToolButton::clicked, this, [this, btn,index]() { onToolButtonClicked(btn); // Lambda表达式中调用成员函数&#xff0c;并传…