由浅入深Netty代码调优

news2025/1/16 3:36:06

目录

  • 1. 优化
    • 1.1 扩展序列化算法
  • 2 参数调优
    • 2.1 CONNECT_TIMEOUT_MILLIS
    • 2.2 SO_BACKLOG
    • 2.3 ulimit -n
    • 2.4 TCP_NODELAY
    • 2.5 SO_SNDBUF & SO_RCVBUF
    • 2.6 ALLOCATOR
    • 2.7 RCVBUF_ALLOCATOR


1. 优化

在这里插入图片描述

1.1 扩展序列化算法

序列化,反序列化主要用在消息正文的转换上

  • 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
  • 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理

目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下

// 反序列化
byte[] body = new byte[bodyLength];
byteByf.readBytes(body);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
Message message = (Message) in.readObject();
message.setSequenceId(sequenceId);

// 序列化
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(message);
byte[] bytes = out.toByteArray();

为了支持更多序列化算法,抽象一个 Serializer 接口

public interface Serializer {

    // 反序列化方法
    <T> T deserialize(Class<T> clazz, byte[] bytes);

    // 序列化方法
    <T> byte[] serialize(T object);

}

提供两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中

enum SerializerAlgorithm implements Serializer {
	// Java 实现
    Java {
        @Override
        public <T> T deserialize(Class<T> clazz, byte[] bytes) {
            try {
                ObjectInputStream in = 
                    new ObjectInputStream(new ByteArrayInputStream(bytes));
                Object object = in.readObject();
                return (T) object;
            } catch (IOException | ClassNotFoundException e) {
                throw new RuntimeException("SerializerAlgorithm.Java 反序列化错误", e);
            }
        }

        @Override
        public <T> byte[] serialize(T object) {
            try {
                ByteArrayOutputStream out = new ByteArrayOutputStream();
                new ObjectOutputStream(out).writeObject(object);
                return out.toByteArray();
            } catch (IOException e) {
                throw new RuntimeException("SerializerAlgorithm.Java 序列化错误", e);
            }
        }
    }, 
    // Json 实现(引入了 Gson 依赖)
    Json {
        @Override
        public <T> T deserialize(Class<T> clazz, byte[] bytes) {
            return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);
        }

        @Override
        public <T> byte[] serialize(T object) {
            return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);
        }
    };

    // 需要从协议的字节中得到是哪种序列化算法
    public static SerializerAlgorithm getByInt(int type) {
        SerializerAlgorithm[] array = SerializerAlgorithm.values();
        if (type < 0 || type > array.length - 1) {
            throw new IllegalArgumentException("超过 SerializerAlgorithm 范围");
        }
        return array[type];
    }
}

增加配置类和配置文件

public abstract class Config {
    static Properties properties;
    static {
        try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
            properties = new Properties();
            properties.load(in);
        } catch (IOException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
    public static int getServerPort() {
        String value = properties.getProperty("server.port");
        if(value == null) {
            return 8080;
        } else {
            return Integer.parseInt(value);
        }
    }
    public static Serializer.Algorithm getSerializerAlgorithm() {
        String value = properties.getProperty("serializer.algorithm");
        if(value == null) {
            return Serializer.Algorithm.Java;
        } else {
            return Serializer.Algorithm.valueOf(value);
        }
    }
}

配置文件

serializer.algorithm=Json

修改编解码器

/**
 * 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
 */
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
    @Override
    public void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
        ByteBuf out = ctx.alloc().buffer();
        // 1. 4 字节的魔数
        out.writeBytes(new byte[]{1, 2, 3, 4});
        // 2. 1 字节的版本,
        out.writeByte(1);
        // 3. 1 字节的序列化方式 jdk 0 , json 1
        out.writeByte(Config.getSerializerAlgorithm().ordinal());
        // 4. 1 字节的指令类型
        out.writeByte(msg.getMessageType());
        // 5. 4 个字节
        out.writeInt(msg.getSequenceId());
        // 无意义,对齐填充
        out.writeByte(0xff);
        // 6. 获取内容的字节数组
        byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
        // 7. 长度
        out.writeInt(bytes.length);
        // 8. 写入内容
        out.writeBytes(bytes);
        outList.add(out);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serializerAlgorithm = in.readByte(); // 0 或 1
        byte messageType = in.readByte(); // 0,1,2...
        int sequenceId = in.readInt();
        in.readByte();
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);

        // 找到反序列化算法
        Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];
        // 确定具体消息类型
        Class<? extends Message> messageClass = Message.getMessageClass(messageType);
        Message message = algorithm.deserialize(messageClass, bytes);
//        log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
//        log.debug("{}", message);
        out.add(message);
    }
}

其中确定具体消息类型,可以根据 消息类型字节 获取到对应的 消息 class

@Data
public abstract class Message implements Serializable {

    /**
     * 根据消息类型字节,获得对应的消息 class
     * @param messageType 消息类型字节
     * @return 消息 class
     */
    public static Class<? extends Message> getMessageClass(int messageType) {
        return messageClasses.get(messageType);
    }

    private int sequenceId;

    private int messageType;

    public abstract int getMessageType();

    public static final int LoginRequestMessage = 0;
    public static final int LoginResponseMessage = 1;
    public static final int ChatRequestMessage = 2;
    public static final int ChatResponseMessage = 3;
    public static final int GroupCreateRequestMessage = 4;
    public static final int GroupCreateResponseMessage = 5;
    public static final int GroupJoinRequestMessage = 6;
    public static final int GroupJoinResponseMessage = 7;
    public static final int GroupQuitRequestMessage = 8;
    public static final int GroupQuitResponseMessage = 9;
    public static final int GroupChatRequestMessage = 10;
    public static final int GroupChatResponseMessage = 11;
    public static final int GroupMembersRequestMessage = 12;
    public static final int GroupMembersResponseMessage = 13;
    public static final int PingMessage = 14;
    public static final int PongMessage = 15;
    private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();

    static {
        messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
        messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
        messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
        messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
        messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
        messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
        messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
        messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
        messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
        messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
        messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
        messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
        messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
        messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
    }
}

2 参数调优

2.1 CONNECT_TIMEOUT_MILLIS

  • 属于 SocketChannal 参数

  • 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常

  • SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间

@Slf4j
public class TestConnectionTimeout {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300)
                    .channel(NioSocketChannel.class)
                    .handler(new LoggingHandler());
            ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
            future.sync().channel().closeFuture().sync(); // 断点1
        } catch (Exception e) {
            e.printStackTrace();
            log.debug("timeout");
        } finally {
            group.shutdownGracefully();
        }
    }
}

另外源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect

@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    // ...
    // Schedule connect timeout.
    int connectTimeoutMillis = config().getConnectTimeoutMillis();
    if (connectTimeoutMillis > 0) {
        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
            @Override
            public void run() {                
                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                ConnectTimeoutException cause =
                    new ConnectTimeoutException("connection timed out: " + remoteAddress); // 断点2
                if (connectPromise != null && connectPromise.tryFailure(cause)) {
                    close(voidPromise());
                }
            }
        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
    }
	// ...
}

2.2 SO_BACKLOG

  • 属于 ServerSocketChannal 参数
client server syns queue accept queue bind() listen() connect() 1. SYN SYN_SEND put SYN_RCVD 2. SYN + ACK ESTABLISHED 3. ACK put ESTABLISHED accept() client server syns queue accept queue
  1. 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
  2. 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
  3. 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue

其中

  • 在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制

  • sync queue - 半连接队列

    • 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
  • accept queue - 全连接队列

    • 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
    • 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client

netty 中

可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置大小

可以通过下面源码查看默认大小

public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
                                              implements ServerSocketChannelConfig {

    private volatile int backlog = NetUtil.SOMAXCONN;
    // ...
}

课堂调试关键断点为:io.netty.channel.nio.NioEventLoop#processSelectedKey

oio 中更容易说明,不用 debug 模式

public class Server {
    public static void main(String[] args) throws IOException {
        ServerSocket ss = new ServerSocket(8888, 2);
        Socket accept = ss.accept();
        System.out.println(accept);
        System.in.read();
    }
}

客户端启动 4 个

public class Client {
    public static void main(String[] args) throws IOException {
        try {
            Socket s = new Socket();
            System.out.println(new Date()+" connecting...");
            s.connect(new InetSocketAddress("localhost", 8888),1000);
            System.out.println(new Date()+" connected...");
            s.getOutputStream().write(1);
            System.in.read();
        } catch (IOException e) {
            System.out.println(new Date()+" connecting timeout...");
            e.printStackTrace();
        }
    }
}

第 1,2,3 个客户端都打印,但除了第一个处于 accpet 外,其它两个都处于 accept queue 中

Tue Apr 21 20:30:28 CST 2020 connecting...
Tue Apr 21 20:30:28 CST 2020 connected...

第 4 个客户端连接时

Tue Apr 21 20:53:58 CST 2020 connecting...
Tue Apr 21 20:53:59 CST 2020 connecting timeout...
java.net.SocketTimeoutException: connect timed out

2.3 ulimit -n

  • 属于操作系统参数

2.4 TCP_NODELAY

  • 属于 SocketChannal 参数

2.5 SO_SNDBUF & SO_RCVBUF

  • SO_SNDBUF 属于 SocketChannal 参数
  • SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)

2.6 ALLOCATOR

  • 属于 SocketChannal 参数
  • 用来分配 ByteBuf, ctx.alloc()

2.7 RCVBUF_ALLOCATOR

  • 属于 SocketChannal 参数
  • 控制 netty 接收缓冲区大小
  • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/551115.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Windows11部署WSL2以及迁移操作系统位置

1 缘起 笔记本电脑Windows 10内存紧张&#xff1a;16 G&#xff0c; 但是&#xff0c;开发需要一些组件&#xff0c;如Redis&#xff08;Redisearch、ReJson&#xff09;、MySQL等&#xff0c; 在Linux容器化中部署更方便&#xff0c;易用&#xff0c; 在Windows中通过虚拟机安…

安卓与串口通信-modbus篇

前言 在之前的两篇文章中&#xff0c;我们讲解了串口的基础知识和在安卓中使用串口通信的方法&#xff0c;如果还没看过之前文章的同学们&#xff0c;建议先看一遍&#xff0c;不然可能会不理解这篇文章讲的某些内容。 事实上&#xff0c;在实际应用中&#xff0c;我们很少会…

Tip in/Out变速箱齿轮敲击过程详细分析

Tip in/Out变速箱齿轮敲击过程详细分析(模型由AMEsim例子改造而成&#xff0c;数据均虚构&#xff0c;仅学习用&#xff09; 1、发动机稳态工况2、Tip in/Out工况3、总结 1、发动机稳态工况 发动机输出力矩&#xff1a; 一轴齿轮驱动力矩&#xff08;离合器减振器输出力矩&am…

为什么要做问卷调查?企业获得用户心声的捷径

问卷调查作为一种重要的数据收集方法&#xff0c;在市场营销、社会学研究、用户研究等领域得到广泛应用。通过问卷调查&#xff0c;我们可以了解受访者的态度、行为、需求等信息&#xff0c;进而为企业和组织的决策提供支持。那么&#xff0c;为什么要做问卷调查呢&#xff1f;…

大语言模型架构设计

【大模型慢学】GPT起源以及GPT系列采用Decoder-only架构的原因探讨 - 知乎本文回顾GPT系列模型的起源论文并补充相关内容&#xff0c;中间主要篇幅分析讨论为何GPT系列从始至终选择采用Decoder-only架构。 本文首发于微信公众号&#xff0c;欢迎关注&#xff1a;AI推公式最近Ch…

一些云原生开源安全工具介绍

本博客地址&#xff1a;https://security.blog.csdn.net/article/details/130789465 一、Kubernetes安全监测工具kube-bench kube-bench是一个用Golang开发的、由Aqua Security发布的自动化Kubernetes基准测试工具&#xff0c;它运行CIS Kubernetes基准中的测试项目。这些测试…

在 uniapp 中通过 Intent 的方式启动其他APP并且传参

文章目录 前言一、其他软件调用文档中的安卓原生代码二、在uniAPP中实现上述方式三、总结四、感谢 前言 由于业务需求需要&#xff0c;我方研发的安卓APP需要调用其他安卓APP&#xff0c;并且将保存返回的文件存储路径进行读取后操作。对方软件公司提供了对接文档和一个测试调…

docker安装华为gaussdb数据库

docker安装gaussdb docker镜像&#xff1a; http://docker.hub.com/ 这里我们使用docker hub镜像下载&#xff0c;该镜像下载较慢&#xff0c;可能有时访问不同&#xff0c;可以使用阿里云镜像下载&#xff0c;阿里云镜像配置参考《docker国内阿里云镜像加速》 拉取镜像 下载…

程序翻译的过程,linux环境下处理,生成 .i、.s、.o 文件(预处理、编译、汇编、链接)

1. 程序翻译的过程有四个步骤&#xff0c;预处理->编译->汇编->链接。 那么每个步骤是干什么&#xff1f; 预处理阶段&#xff1a;处理-> 头文件、宏替换、条件编译等等&#xff0c;我用 linux 环境查看一下&#xff0c;如下&#xff1a; 首先写一个简单的 .c 文…

【iptables 防火墙设置】

目录 一、iptables概述1、netfilter/iptables 关系 二、四表五链2.1、四表:2.2、五链&#xff1a; 三、规则链之间的匹配顺序四、规则链内的匹配顺序五、iptables的安装配置5.1、安装iptables5.2、配置iptables1、常用的管理选项2、常用的参数3、常用的控制类型4、iptables语法…

ThinkPHP6 模型层的模型属性,表映射关系,以及如何在控制层中使用模型层和模型层中的简单CRUD

ThinkPHP6 模型层的模型属性&#xff0c;表映射关系&#xff0c;以及模型层的CRUD及如何在控制层中使用模型层 1. model 模型层的默认映射规则 模型&#xff0c;即mvc模式中的model层&#xff0c;model层用来对接数据库&#xff0c;操作数据库的增删改查。 在tp6中&#xff…

springboot整合sharding-jdbc实现分库分表详解

目录 一、为什么需要分库分表 1.1 分库分表的优势 二、分库分表基本概念 2.1 垂直分表 2.2 水平分表 2.3 垂直分库 2.4 水平分库 三、分库分表带来的问题 3.1 分布式事务问题 3.2 跨节点关联查询问题 3.3 跨节点分页、排序问题 3.4 主键避重问题 四、分库分表常用…

Java --- 云尚办公之菜单管理模块

一、菜单管理 数据库表&#xff1a; CREATE TABLE sys_menu (id BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 编号,parent_id BIGINT(20) NOT NULL DEFAULT 0 COMMENT 所属上级,name VARCHAR(20) NOT NULL DEFAULT COMMENT 名称,type TINYINT(3) NOT NULL DEFAULT 0 COMMEN…

LSTM预测汇率涨跌分析

前言 本文主要是采用lstm对汇率涨跌进行预测&#xff0c;是一个二分类的预测问题。 步骤解析 数据构造 原始数据是单变量数据 import pandas as pdfile_path r"./huilv.csv" data pd.read_csv(file_path, usecols[1],encodinggbk) data[level] -1 美元 l…

打造高效接口自动化框架,YAML测试用例封装技巧大揭秘!

目录 前言&#xff1a; 一、框架介绍 本框架包含两个部分&#xff1a; 本框架的构建目标是&#xff1a; 二、框架目录结构 三、规范YAML测试用例封装步骤 四、框架使用 五、总结 前言&#xff1a; 本文介绍了一个基于Python和PyTest的接口自动化框架封装项目实战&#…

最佳实践,高效编写Web自动化测试强制交互方法封装技巧

目录 前言&#xff1a; 一、Web自动化测试的基本原理 二、封装强制交互方法 1、输入框强制交互 2、其他强制交互 三、封装基础类方法 四、总结 前言&#xff1a; Web自动化测试是现代软件开发中必不可少的部分。Web自动化测试可以帮助测试人员快速地验证页面功能并发现潜…

Fiddler 抓包工具下载安装基本使用(详)

在做软件测试或者Bug定位的时候会用到一些抓包工具&#xff0c;当然抓包工具还要一些其他用途可以做一些API的抓取&#xff0c;那么本篇内容就来讲 Fiddler 抓包工具的下载安装以及如何来实际的应用。讲了这些可能有的读者还不知道这个"Fiddler"怎么读呢&#xff1f;…

详解flutter刷新流程,让你的应用更流畅

本文已授权公众号【缦图技术团队】发布 详解flutter刷新流程&#xff0c;让你的应用更流畅 一、概述 Flutter 是谷歌推出的高性能、跨端UI框架&#xff0c;可以通过一套代码&#xff0c;支持 iOS、Android、Windows/MAC/Linux 等多个平台&#xff0c;且能达到原生性能。Flutte…

pthread_getspecific和pthread_setspecific详解

写在前面 在Linux系统中使用C/C进行多线程编程时&#xff0c;我们遇到最多的就是对同一变量的多线程读写问题&#xff0c;大多情况下遇到这类问题都是通过锁机制来处理&#xff0c;但这对程序的性能带来了很大的影响&#xff0c;当然对于那些系统原生支持原子操作的数据类型来…

【CV】Yolov8:ultralytics目标检测、关键点检测、语义分割

note Yolov8提供了一个全新的 SOTA 模型&#xff0c;包括 P5 640 和 P6 1280 分辨率的目标检测网络和基于 YOLACT 的实例分割模型。和 YOLOv5 一样&#xff0c;基于缩放系数也提供了 N/S/M/L/X 尺度的不同大小模型&#xff0c;用于满足不同场景需求骨干网络和 Neck 部分可能参…