Netty:入门(2)

news2025/1/8 3:27:28

相关文章:

《IO 模型与多路复用》

《Java NIO》

《Netty:入门(1)》

        写在开头:本文为学习后的总结,可能有不到位的地方,错误的地方,欢迎各位指正。

前言

        在前文中,我们对Netty的内容做了简单的介绍,本文我们会结合Netty的流程图相对深入一些的介绍下其中的重要组件。

一、EventLoop

        EventLoop

        事件循环对象 EventLoop本质是一个单线程执行器(同时维护了一个 Selector),支持异步提交执行任务,线程启动时会调用 NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务:

  • I/O 任务,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发。
  • 非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发。

        它的继承关系如下:

  • 继承自 j.u.c.ScheduledExecutorService,因此包含了线程池中所有的方法
  • 继承自 netty 自己的 OrderedEventExecutor,提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop,同时还提供了 EventLoopGroup parent() 方法来看看自己属于哪个 EventLoopGroup

        EventLoopGroup

        事件循环组EventLoopGroup是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

        继承自 netty 自己的 EventExecutorGroup,实现了 Iterable 接口提供遍历 EventLoop 的,并提供了next方法获取集合中下一个 EventLoop。

        EventLoop初始化时可以指定线程数,可以不指定,如果不指定,这里会传0用来调用父类MultithreadEventLoopGroup中得构造方法

public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    public NioEventLoopGroup() {
        this(0);
    }
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }
    ...
}

        MultithreadEventLoopGroup类提供1个默认线程数DEFAULT_EVENT_LOOP_THREADS,比较1与Netty系统参数"io.netty.eventLoopThreads"(如未设置则使用CPU核心数*2)中得最大值。当传入得初始化线程数为0时就使用这个默认线程数。

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);

    // 默认线程数
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        // 静态代码块中,给默认线程数DEFAULT_EVENT_LOOP_THREADS 赋值
        // 比较1与Netty系统参数"io.netty.eventLoopThreads"(如未设置则使用CPU核心数*2)中得最大值
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

    // 如果传入得线程数为0,则使用默认线程数
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    ...
}

        EventLoop处理普通与定时任务

public class TestEventLoop {
    public static void main(String[] args) {
        // 创建拥有两个EventLoop的NioEventLoopGroup,对应两个线程
        EventLoopGroup group = new NioEventLoopGroup(2);
        // 通过next方法可以获得下一个 EventLoop
        System.out.println(group.next());
        System.out.println(group.next());

        // 通过EventLoop执行普通任务
        group.next().execute(()->{
            System.out.println(Thread.currentThread().getName() + " hello");
        });

        // 通过EventLoop执行定时任务(表示立即执行(0),每次间隔1秒(1))
        group.next().scheduleAtFixedRate(()->{
            System.out.println(Thread.currentThread().getName() + " hello2");
        }, 0, 1, TimeUnit.SECONDS);
        
        // 优雅地关闭
        group.shutdownGracefully();
    }
}

        输出结果如下

io.netty.channel.nio.NioEventLoop@7bb11784
io.netty.channel.nio.NioEventLoop@33a10788
nioEventLoopGroup-2-1 hello
nioEventLoopGroup-2-2 hello2
nioEventLoopGroup-2-2 hello2
nioEventLoopGroup-2-2 hello2

        优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的。

        下面演示下处理IO任务:

        服务器代码

public class MyServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                // 添加EventLoop
                .group(new NioEventLoopGroup())
                // 选择服务端Channel实现
                .channel(NioServerSocketChannel.class)
                // 添加处理器
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override    // 连接建立后调用
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                // 将服务端收到的数据转bytebuf后再转String
                                ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(Charset.defaultCharset()));

                            }
                        });
                    }
                })
                .bind(8080);
    }
}

         客户端代码

public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        Channel channel = new Bootstrap()
                // 添加EventLoop
                .group(new NioEventLoopGroup())
                // 选择客户端Channel实现
                .channel(NioSocketChannel.class)
                // 添加处理器
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override    // 连接建立后调用
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 连接到服务器
                .connect(new InetSocketAddress("localhost", 8080))
                .sync()
                .channel();
        System.out.println(channel);
        System.in.read();
    }
}

        EventLoopGroup分工  

        Bootstrap的group()方法可以传入两个EventLoopGroup参数,分别负责处理Accept事件(boss线程)与Read/Write(worker线程)(参考下我们前一篇文章中介绍得流程图)

public class MyServer {
    public static void main(String[] args) {
        new ServerBootstrap()
            	// 两个Group,分别为Boss(负责Accept事件),Worker(负责读写事件)
                .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                // ...
				// 上文中处理IO任务得代码
    }
}

         使用上文中多个客户端分别发送 hello 结果

nioEventLoopGroup-3-1 hello1
nioEventLoopGroup-3-2 hello2
nioEventLoopGroup-3-1 hello3
nioEventLoopGroup-3-2 hello4
nioEventLoopGroup-3-2 hello4

         可以看出,一个EventLoop可以负责多个Channel,且EventLoop一旦与Channel绑定,则一直负责处理该Channel中的事件。

        分工细化

        当有的任务需要较长的时间处理时,可以再添加一个非NioEventLoopGroup,避免同一个NioEventLoop中的其他Channel在较长的时间内都无法得到处理。

public class MyServer {
    public static void main(String[] args) {
        // 增加自定义的非NioEventLoopGroup
        EventLoopGroup group = new DefaultEventLoopGroup();
        
        new ServerBootstrap()
                .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 增加两个handler,第一个使用NioEventLoopGroup处理,第二个使用自定义EventLoopGroup处理
                        socketChannel.pipeline().addLast("nioHandler",new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                                // 调用下一个handler
                                ctx.fireChannelRead(msg);
                            }
                        })
                        // 该handler绑定自定义的Group
                        .addLast(group, "myHandler", new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

         启动四个客户端发送数据

nioEventLoopGroup-4-1 hello1
defaultEventLoopGroup-2-1 hello1
nioEventLoopGroup-4-2 hello2
defaultEventLoopGroup-2-2 hello2
nioEventLoopGroup-4-1 hello3
defaultEventLoopGroup-2-3 hello3
nioEventLoopGroup-4-2 hello4
defaultEventLoopGroup-2-4 hello4

         可以看出,客户端与服务器之间的事件,被nioEventLoopGroup和defaultEventLoopGroup分别处理。

         EventLoopGroup切换源码

        不同的EventLoopGroup切换的实现原理如下:由上面的图可以看出,当handler中绑定的Group不同时,需要切换Group来执行不同的任务。

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 获取下一个handler得EventLoop 
    EventExecutor executor = next.executor();
    
    // 判断当前handler中得线程与executor(下一个handler得EventLoop)是否为同一个
    if (executor.inEventLoop()) {
        // 使用当前EventLoopGroup中的EventLoop来处理任务
        next.invokeChannelRead(m);
    } else {
        // 将要执行得代码作为任务交给executor处理
        executor.execute(new Runnable() {
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}
  • 如果两个 handler 绑定的是同一个EventLoopGroup,那么就直接调用
  • 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的 EventLoopGroup 来调用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/20726.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring 中 Bean 对象的存储和取出

由于 Spring 拥有对象的管理权&#xff0c;所以我们也需要拥有较为高效的对象存储和取出的手段&#xff0c;下面我们来分别总结一下&#xff1a; 存对象 配置文件 在存储对象之前&#xff0c;我们需要先配置一下 Spring 的扫描目录&#xff0c;这样 Spring 即可在正确的目录…

JVM复习【面试】

JVM复习【面试】前言推荐复习【JVM】第一部分 走进Java第1章 走进Java /2第二部分 自动内存管理机制第2章 Java内存区域与内存溢出异常 /382.2 运行时数据区 /382.2.2 Java虚拟机栈 /392.3 HotSpot虚拟机对象探秘2.3.1 对象的创建 /442.3.2 对象的内存布局 /472.4 实战&#xf…

HTML-Demo:工商银行电子汇款单

HTML-Demo&#xff1a;工商银行电子汇款单 Date: November 20, 2022 Demo简介&#xff1a; 简要说明一下这个demo 用HTML完成以下表格 知识点简介&#xff1a; 简要介绍其中一些知识点 表格属性 cellspacing 与 cellpadding 功能&#xff1a; cellpadding和cellspacing属性控…

面试:java中的各种锁对比

共享锁 共享锁有CountDownLatch, CyclicBarrier, Semaphore, ReentrantReadWriteLock等 ReadWriteLock&#xff0c;顾名思义&#xff0c;是读写锁。它维护了一对相关的锁 — — “读取锁”和“写入锁”&#xff0c;一个用于读取操作&#xff0c;另一个用于写入操作。“读取锁…

D. Make It Round(贪心 贡献 数学)[Codeforces Round #834 (Div. 3)]

题目如下&#xff1a; 思路 or 题解&#xff1a; 我们先考虑如何操作使结尾有最多的 0 我们不难发现&#xff1a; 2 * 5 10 10 10 我们是否只需要考虑 2 与 5 的贡献就行了 答案是肯定的&#xff01;&#xff01;&#xff01; 约定&#xff1a; cnt5因数5的个数cnt_5 因数 …

kubernetes集群安装Ingress-nginx

文章目录概述搭建环境版本对应关系yaml文件安装实操演示常见问题外链地址概述 Ingress 公开从集群外部到集群内服务的 HTTP 和 HTTPS 路由。 流量路由由 Ingress 资源上定义的规则控制。 Kubernetes 通过 kube-proxy 服务实现了 Service 的对外发布及负载均衡&#xff0c;它的各…

2.3、传输方式

2.3、传输方式 2.3.1、串行&并行 2.3.3.1、串行传输 串行传输是指数据是一个比特一个比特依次发送的。因此&#xff0c;在发送端与接收端之间只需要一条数据传输线路即可 2.3.3.2、并行传输 一次发送 nnn 个比特。为此&#xff0c;在发送端和接收端之间需要有 nnn 条传输…

解决vscode各种异常格式化编译器配置

在vscode中创建vue文件时&#xff0c;若编辑代码时会出现间隔一段时间后自动的格式化内容&#xff0c;会很烦&#xff0c;经反复改查后无果&#xff0c;后来&#xff0c;对编辑器进行全面配置 首先原setting.json文件中的代码是这样的 { "files.autoSave": &qu…

定压补水装置 隔膜式定压补水装置

循环水中气体的来源及危害 A、气体来源 1、补水中夹带气体。 2、在定压不稳时吸入的气体。 3、放水时气体的侵入。 4、管道阀门等设备跑冒滴漏时侵入。 B、气体存在的危害 1、容易形成气阻&#xff0c;增加运营成本。 水中气体不及时排除&#xff0c;它所形成空气袋或气柱&am…

Vue学习(九)——混入

前言 混入&#xff08;mixin&#xff09;的使用非常简单&#xff0c;其实我原本打算直接写插件&#xff08;plugin&#xff09;的&#xff0c;但考虑到插件的使用范围也包括混入和自定义指令&#xff0c;还是先讲讲这两个的基本概念。 混入在我看来&#xff0c;就是给组件加上…

maya 卡通草地制作方法笔记

maya 卡通草地制作方法笔记 一、概述 maya制作草地的方法很多&#xff0c;有粒子替代种子法&#xff0c;painter笔刷法&#xff0c;xgen毛发模拟法&#xff0c;也有直接批量大量代理物体复制法等等。这次讨论的是用maya的painter笔刷法&#xff0c;审核制作卡通类简单的草地效…

Linux 进程概念 —— 冯 • 诺依曼体系结构

文章目录1. 冯诺依曼体系结构&#x1f351; 输入、输出设备&#x1f351; 中央处理器&#x1f351; 内存&#x1f351; 总线&#x1f351; 局部性原理&#x1f351; 总结2. 数据的流动过程1. 冯诺依曼体系结构 在 1945 年冯诺依曼和其他计算机科学家们提出了计算机具体实现的报…

URL和URI的区别

文章目录URLSchemeAuthorityPath to resourceParametersAnchorURL和URI参考URL 以下是 URL 的一些示例&#xff1a; https://developer.mozilla.org https://developer.mozilla.org/en-US/docs/Learn/ https://developer.mozilla.org/en-US/search?qURL这些 URL 中的任何一个…

[附源码]java毕业设计水库水面漂浮物WEB系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

攻防世界Check

Check 题目描述&#xff1a;套娃&#xff1f; 题目环境&#xff1a;https://download.csdn.net/download/m0_59188912/87097474 将图片放入winhex中打开&#xff0c;文件头与文件尾均正常&#xff0c;文件大小也正常。 判断为lsb隐写&#xff0c;查看三个颜色的plane 0通道&…

mysql复习【面试】

mysql复习【面试】前言mysql复习第08章 索引的创建与设计原则3.索引设计原则3.2哪些情况下适合创建索引3.4 哪些情况不适合创建索引第10章 索引优化与查询优化2.索引失效案例8.覆盖索引9. 如何给字符串添加索引10. 索引下推11. 普通索引 vs 唯一索引12.其他的优化策略13. 淘宝数…

深入浅出学习透析Nginx服务器的基本原理和配置指南「负载均衡篇」

负载均衡 之前的章节内容中【深入浅出学习透析Nginx服务器的基本原理和配置指南「初级实践篇 」】和 【深入浅出学习透析Nginx服务器的基本原理和配置指南「进阶实践篇」】&#xff0c;我们采用的代理仅仅指向一个服务器。但是网站在实际运营过程中&#xff0c;大部分都是以集群…

【万兴PDF专家】OCR引擎的离线安装方法,让你不受网速的折磨,PDF给OCR成可搜索的高级PDF,牛逼了我的万兴

一、问题背景 万兴PDF是一个很好用的PDF工具&#xff0c;它不仅可以实现PDF的浏览和批注常见功能&#xff0c;还具有OCR、压缩PDF&#xff0c;乃至批量化的功能。 因此&#xff0c;实在是一个非常值得花钱去买的PDF工具包&#xff01;&#xff01; 但是&#xff0c;软件里的O…

Prometheus与Grafana监控SpringBoot应用

Prometheus与Grafana监控SpringBoot应用 1.SpringBoot应用暴露端点 2.转换成Prometheus能解析得数据 3.向Prometheus注册时赋予项目名 docker部署 4701模板

七.STM32F030C8T6 MCU开发之TIMER模块级联组成32BIT计时器案例

七.STM32F030C8T6 MCU开发之TIMER模块级联组成32BIT计时器案例 文章目录七.STM32F030C8T6 MCU开发之TIMER模块级联组成32BIT计时器案例0.总体功能概述1.TIM硬件介绍1.1 TIM1/3级联硬件介绍1.1.1 主从模式介绍1.1.2 TIM1为主&#xff0c;TIM3为从&#xff0c;TIM3 的输入触发源选…