Netty 启动源码阅读

news2024/11/22 0:47:52

文章目录

  • 1. 入门
  • 2. Netty 代码实例
  • 3. Netty bind
    • 3.1 initAndRegister
      • 3.1.1 newChannel, 创建 NioServerSocketChannel
      • 3.1.2 init(channel); 初始化 NioServerSocketChannel
      • 3.1.3 register 注册channel
    • 3.2 doBind0 绑定端口
    • 3.3 ServerBootstrapAcceptor

1. 入门

主从Reactor模型 :Acceptor 接收到客户端TCP连接请求并处理完成后, 将新创建的SocketChannel 注册到 I/O线程池 (sub Reactor)传送门

主要步骤:

  • Acceptor 创建 ServerSocketChannel
  • Acceptor ServerSocketChannel 绑定端口
  • Acceptor ServerSocketChannel 设置非阻塞
  • Acceptor 创建 Selector,将 ServerSocketChannel 注册到 Selector 上,监听 SelectionKey.OP_ACCEPT 事件
  • Acceptor ServerSocketChannel 设置分发处理器,处理监听到的SelectionKey.OP_ACCEPT 事件,新创建的 SocketChannel 分发到到 I/O线程池 (sub Reactor)
  • I/O线程池 (sub Reactor)不断轮训处理 SocketChannel 上的读写请求

2. Netty 代码实例

传送门

ServerBootstrap:

  • 配置 EventLoopGroup 线程组:
    需要注意的是, 只绑定一个端口, bossEventLoopGroup 1个就够了, 2个会有一个闲置
   NioEventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(2);
   NioEventLoopGroup workerEventLoopGroup = new NioEventLoopGroup(8);
  • 设置 Channel 类型: NioServerSocketChannel
  • 设置 childHandlerChannelPipeline
  • 绑定端口: 创建 NioServerSocketChannel, 注册, 绑定端口, ServerSocketChannel 添加分发到子线程组的 handler
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.nio.charset.CharacterCodingException;
import java.nio.charset.StandardCharsets;

public class NettyServer01 {
    public static void main(String[] args) {
        // 创建BossGroup和WorkerGroup,分别处理连接接受和数据读写
        NioEventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(2);
        NioEventLoopGroup workerEventLoopGroup = new NioEventLoopGroup(8);

        new ServerBootstrap() // 初始化ServerBootstrap
                .group(bossEventLoopGroup, workerEventLoopGroup) // 设置EventLoopGroup
                .channel(NioServerSocketChannel.class) // 指定服务器通道类
                .childHandler(new ChannelInitializer<NioSocketChannel>() { // 设置通道初始化器
                    /**
                     * 初始化通道,添加处理器到通道的管道中
                     * @param ch 当前初始化的通道
                     */
                    protected void initChannel(NioSocketChannel ch) {
                        // 添加多个处理器,分别处理入站和出站事件
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            /**
                             * 处理入站数据
                             * @param ctx 通道上下文
                             * @param msg 接收到的消息对象
                             */
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                ByteBuf byteBuf = inbound((ByteBuf) msg, "1");
                                ctx.fireChannelRead(byteBuf);
                            }
                        });
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws CharacterCodingException {
                                ByteBuf byteBuf = inbound((ByteBuf) msg, "2");
                                ctx.fireChannelRead(byteBuf);
                            }
                        });
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            /**
                             * 处理入站数据,将处理后的数据写回通道
                             * @param ctx 通道上下文
                             * @param msg 接收到的消息对象
                             */
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                ByteBuf byteBuf = inbound((ByteBuf) msg, "3");
                                ctx.channel().write(byteBuf);
                            }
                        });
                        ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
                            /**
                             * 处理出站数据,在数据写出前进行加工
                             * @param ctx 通道上下文
                             * @param msg 要写出的消息对象
                             * @param promise 写操作的承诺
                             */
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                                ByteBuf byteBuf = outbound((ByteBuf) msg, "4");
                                ctx.writeAndFlush(msg);
                                ctx.write(byteBuf, promise);
                            }
                        });
                        ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                                ByteBuf byteBuf = outbound((ByteBuf) msg, "5");
                                ctx.write(byteBuf, promise);
                            }
                        });
                        ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                                ByteBuf byteBuf = outbound((ByteBuf) msg, "6");
                                ctx.write(byteBuf, promise);
                            }
                        });
                    }
                })
                .bind(8080); // 绑定端口并启动服务器
    }

        /**
     * 对出站数据进行处理
     * @param msg 待处理的ByteBuf对象
     * @param no 数据标识号
     * @return 处理后的ByteBuf对象
     */
    private static ByteBuf outbound(ByteBuf msg, String no) {
        ByteBuf byteBuf = msg;
        String output = byteBufToString(byteBuf);
        System.out.printf("\n\noutbound%s output: %s", no, output);
        stringWriteToByteBuf(byteBuf, String.format("\noutbound%s 已处理", no));
        return byteBuf;
    }

    /**
     * 对入站数据进行处理
     * @param msg 待处理的ByteBuf对象
     * @param no 数据标识号
     * @return 处理后的ByteBuf对象
     */
    private static ByteBuf inbound(ByteBuf msg, String no) {
        String input = byteBufToString(msg);
        System.out.printf("\n\ninbound%s input: %s\n", no, input);
        stringWriteToByteBuf(msg, String.format("\ninbound%s 已处理", no));
        return msg;
    }

    /**
     * 将ByteBuf对象转换为字符串
     * @param msg 待转换的ByteBuf对象
     * @return 字符串表示的数据
     */
    private static String byteBufToString(ByteBuf msg) {
        return msg.toString(StandardCharsets.UTF_8);
    }

    /**
     * 将字符串写入ByteBuf对象
     * @param byteBuf 待写入的ByteBuf对象
     * @param msg 要写入的字符串数据
     */
    private static void stringWriteToByteBuf(ByteBuf byteBuf, String msg) {
        byteBuf.writeBytes(msg.getBytes(StandardCharsets.UTF_8));
    }
}

3. Netty bind

主要下面两个方法:

  • final ChannelFuture regFuture = initAndRegister();
    初始化channel,并且注册ServerSocketChannelbossEventLoopGroup 的 一个 EventLoopSelector
  • doBind0(regFuture, channel, localAddress, promise); 绑定端口号
    public ChannelFuture bind(InetAddress inetHost, int inetPort) {
        return bind(new InetSocketAddress(inetHost, inetPort));
    }

    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }

     private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

在这里插入图片描述

3.1 initAndRegister

io.netty.bootstrap.AbstractBootstrap#initAndRegister

初始化channel,并且注册ServerSocketChannelbossEventLoopGroup 的 一个 EventLoopSelector

inal ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }

3.1.1 newChannel, 创建 NioServerSocketChannel

  • 创建 channel = channelFactory.newChannel();
  • io.netty.channel.ReflectiveChannelFactory#newChannel
  • io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel
    • NioServerSocketChannel处理连接事件: super(null, channel, SelectionKey.OP_ACCEPT);
    • AbstractNioChannel: 设置非阻塞 ch.configureBlocking(false);
    • AbstractChannel 初始化unsafe、pipeline: unsafe = newUnsafe(); pipeline = newChannelPipeline();
    • DefaultChannelPipeline: tail = new TailContext(this); head = new HeadContext(this);

3.1.2 init(channel); 初始化 NioServerSocketChannel

初始化 Channelio.netty.bootstrap.ServerBootstrap#init, pipeline 添加 ServerBootstrapAcceptor 是一个异步过程,需要 EventLoop 线程负责执行。而当前 EventLoop 线程该去执行 register0() 的注册流程,所以等到 register0() 执行完之后才能被添加到 Pipeline 当中

   p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
  • 注册 Channel : ChannelFuture regFuture = config().group().register(channel);
    • config().group() 就是 bossEventLoopGroup
  • executeio.netty.util.concurrent.SingleThreadEventExecutor#execute
    • addTask(task);
    • if (!inEventLoop) startThread();

3.1.3 register 注册channel

  • 注册 ServerSocketChannelbossEventLoopGroup 的一个 EventLoopSelector 上,监听 SelectionKey.OP_ACCEPT 事件
ChannelFuture regFuture = config().group().register(channel);
  • io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}
  • io.netty.channel.AbstractChannel.AbstractUnsafe#register
if (eventLoop.inEventLoop()) {
    register0(promise);
} else {
    try {
        eventLoop.execute(new Runnable() {
        @Override
        public void run() {
            register0(promise);
        }
);
  • io.netty.channel.AbstractChannel.AbstractUnsafe#register0
  • io.netty.channel.nio.AbstractNioChannel#doRegister
    • 调用java 的 nio:selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

3.2 doBind0 绑定端口

io.netty.bootstrap.AbstractBootstrap#doBind0

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
  • io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
  • io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
  • io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
  • io.netty.channel.AbstractChannelHandlerContext#invokeBind
  • io.netty.channel.DefaultChannelPipeline.HeadContext#bind
  • io.netty.channel.AbstractChannel.AbstractUnsafe#bind
  • io.netty.channel.socket.nio.NioServerSocketChannel#doBind 调用java 的 bind
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

3.3 ServerBootstrapAcceptor

child 就是 workerEventLoopGroupsocketChannel 注册到 workerEventLoopGroup 进行处理

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (!future.isSuccess()) {
                forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
          forceClose(child, t);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1905044.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

不是哥们?你怎么抖成这样了?求你进来学学防抖吧!全方位深入剖析防抖的奥秘

前言 古有猴哥三打白骨精&#xff0c;白骨精 > 噶 今有用户疯狂点请求&#xff0c;服务器 > 噶 所以这防抖咱必须得学会&#xff01;&#xff01;&#xff01; 本文就来讲解一下Web前端中防抖的奥秘吧&#xff01;&#xff01;&#xff01;&#xff01; 为什么要做防…

2-27 基于matlab的一种混凝土骨料三维随机投放模型

基于matlab的一种混凝土骨料三维随机投放模型&#xff0c;为混凝土细观力学研究提供一种快捷的三维建模源代码。可设置骨料数量&#xff0c;边界距离、骨料大小等参数。程序已调通&#xff0c;可直接运行。 2-27 matlab 混凝土骨料三维随机投放模型 - 小红书 (xiaohongshu.com)…

盘点8款国内顶尖局域网监控软件(2024年国产局域网监控软件排名)

局域网监控软件对于企业网络管理至关重要&#xff0c;它们可以帮助IT部门维护网络安全&#xff0c;优化网络性能&#xff0c;同时监控和控制内部员工的网络使用行为。以下是八款备受推崇的局域网监控软件&#xff0c;每一款都有其独特的优势和适用场景。 1.安企神软件 试用版领…

CompletionService

必备知识&#xff1a; 三种创建线程的方式 java线程池 CompletionService是Java并发库中的一个接口&#xff0c;用于简化处理一组异步任务的执行和结果收集。它结合了Executor和BlockingQueue的功能&#xff0c;帮助管理任务的提交和完成。CompletionService的主要实现类是Exe…

python破解字母已知但大小写未知密码

python穷举已知字符串中某个或多个字符为大写的所有情况 可以使用递归函数来实现这个功能。以下是一个示例代码&#xff1a; def generate_uppercase_combinations(s, index0, current):if index len(s):print(current)returngenerate_uppercase_combinations(s, index 1, …

如何保证接口幂等性

如何保证接口幂等性 1、幂等性是什么&#xff1f; 接口幂等性是指用户对于同一操作发起的一次请求或者多次请求的结果是一致的&#xff0c;不会因为多次点击而产生了不同的结果。 2、使用幂等性的场景有哪些&#xff1f; 页面点击保存按钮时&#xff0c;不小心快速点了两次…

BUUCTF[PWN][fastbin attack]

fastbin_attack例题 题目&#xff1a;[BUUCTF在线评测 (buuoj.cn)](https://buuoj.cn/challenges#[ZJCTF 2019]EasyHeap) 整体思路&#xff1a;利用编辑时edit_heap函数的栈溢出漏洞&#xff0c;覆盖heaparray中的栈指针指向free的got表&#xff0c;将其改为system的plt表&…

Bert入门-使用BERT(transformers库)对推特灾难文本二分类

Kaggle入门竞赛-对推特灾难文本二分类 这个是二月份学习的&#xff0c;最近整理资料所以上传到博客备份一下 数据在这里&#xff1a;https://www.kaggle.com/competitions/nlp-getting-started/data github&#xff08;jupyter notebook&#xff09;&#xff1a;https://gith…

C语言指针函数指针

跟着这篇文章重新理解了一下&#xff1a;彻底攻克C语言指针 有一个例子感觉可以拿出来看看&#xff1a; char *(*c[10])(int **p); * 这段声明定义了一个长度为10的数组c&#xff0c;数组中的每个元素都是指向函数的指针。每个函数接受一个类型为int **&#xff08;指向指向…

【SpringCloud应用框架】Nacos集群架构说明

第六章 Spring Cloud Alibaba Nacos之集群架构说明 文章目录 前言一、Nacos支持三种部署模式二、集群部署说明三、预备环境 前言 到目前为止&#xff0c;已经完成了对Nacos的一些基本使用和配置&#xff0c;接下来还需要了解一个非常重要的点&#xff0c;就是Nacos的集群相关的…

用PlantUML和语雀画UML类图

概述 首先阐述一下几个简单概念&#xff1a; UML&#xff1a;是统一建模语言&#xff08;Unified Modeling Language&#xff09;的缩写&#xff0c;它是一种用于软件工程的标准化建模语言&#xff0c;旨在提供一种通用的方式来可视化软件系统的结构、行为和交互。UML由Grady…

一.7.(2)基本运算电路,包括比例运算电路、加减运算电路、积分运算电路、微分电路等常见电路的分析、计算及应用;(未完待续)

what id the 虚短虚断虚地? 虚短&#xff1a;运放的正相输入端和反相输入端貌似连在一起了&#xff0c;所以两端的电压相等&#xff0c;即UU- 虚断&#xff1a;输入端输入阻抗无穷大 虚地&#xff1a;运放正相输入端接地&#xff0c;导致U&#xff1d;U-&#xff1d;0。 虚…

远心镜头简介

一、远心镜头 大家都有这种印象&#xff0c;一个物体在人眼看来&#xff0c;会有近大远小的现象。这是因为物体近的时候&#xff0c;在视网膜上投影大&#xff0c;小的时候&#xff0c;投影小。镜头也是一样&#xff0c;因为近大远小的原因&#xff0c;会产生误差。特别是在做尺…

通信协议_Modbus协议简介

概念介绍 Modbus协议&#xff1a;一种串行通信协议&#xff0c;是Modicon公司&#xff08;现在的施耐德电气Schneider Electric&#xff09;于1979年为使用可编程逻辑控制器&#xff08;PLC&#xff09;通信而发表。Modbus已经成为工业领域通信协议的业界标准&#xff08;De f…

Java里的Arrary详解

DK 中提供了一个专门用于操作数组的工具类&#xff0c;即Arrays 类&#xff0c;位于java.util 包中。该类提供了一些列方法来操作数组&#xff0c;如排序、复制、比较、填充等&#xff0c;用户直接调用这些方法即可不需要自己编码实现&#xff0c;降低了开发难度。 java.util.…

DC-DC充放电原理

文章目录 前言1. 电子器件1.1 电容1.2 电感 2. 升压电路3. 降压电路4. 电压均衡电路4.1 被动均衡4.2 主动均衡 5. 我的疑问5.1 对于升压电路&#xff0c;怎么设计升压到多少V后&#xff0c;停止升压&#xff1f;5.2 什么是等效电阻&#xff1f;5.3 快充是如何实现的&#xff1f…

探索InitializingBean:Spring框架中的隐藏宝藏

​&#x1f308; 个人主页&#xff1a;danci_ &#x1f525; 系列专栏&#xff1a;《设计模式》《MYSQL》 &#x1f4aa;&#x1f3fb; 制定明确可量化的目标&#xff0c;坚持默默的做事。 ✨欢迎加入探索MYSQL索引数据结构之旅✨ &#x1f44b; Spring框架的浩瀚海洋中&#x…

ISP和IAP原理解释

ISP和IAP ISP ISP的全称是&#xff1a;In System Programming&#xff0c;即在系统编程&#xff0c;该操作是通过MCU厂商出厂BootLoader来实现&#xff0c;通过ISP可以对主flash区域进行擦除、编程操作&#xff0c;还可以修改芯片的选项字节等。例如&#xff0c;GD32F30x用户…

Failed to get D-Bus connection: Operation not permitted

最近使用wsl安装了centOS7镜像&#xff0c;在系统中安装了docker服务&#xff0c;但是在执行systemctl start docker的时候遇到了&#xff1a;Failed to get D-Bus connection: Operation not permitted问题&#xff0c;查阅了很多资料都没有效果&#xff0c;最终找到了一种解决…

RabbitMQ(集群相关部署)

RabbitMQ 集群部署 环境准备&#xff1a;阿里云centos8 服务器&#xff0c;3台服务器&#xff0c;分别进行安装&#xff1b; 下载Erlang Erlang和RabbitMQ版本对照&#xff1a;https://www.rabbitmq.com/which-erlang.html 创建yum库配置文件 vim /etc/yum.repos.d/rabbi…