Netty第二部

news2024/11/27 15:52:36

一、EventLoop和EventLoopGroup

一个Channel可以近似的理解成一个Socket的包装,EventLoop管理这些Channel的

1、EventLoop

EventLoop作为线程,具体Channel由EventLoop管理,在AbstractChannel类的register()方法可以体现

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
            new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    // 判断是否为当前线程
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            // 不是的话会交给eventLoop执行
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

EventLoop类结构图

EventLoop对应的实现类

看看之前用到的NioEventLoop,在NioEventLoop类中定义如下两个属性,Java中线程和队列的组件就会想到线程池

private final Queue<Runnable> taskQueue;

private volatile Thread thread;

2、EventLoopGroup

EventLoopGroup主要是每个新建的Channel分配一个EventLoop以及管理EventLoop

public interface EventLoopGroup extends EventExecutorGroup {
    /**
     * Return the next {@link EventLoop} to use
     */
    @Override
    EventLoop next();

    /**
     * Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}
     * will get notified once the registration was complete.
     */
    ChannelFuture register(Channel channel);

    /**
     * Register a {@link Channel} with this {@link EventLoop} using a {@link ChannelFuture}. The passed
     * {@link ChannelFuture} will get notified once the registration was complete and also will get returned.
     */
    ChannelFuture register(ChannelPromise promise);

    /**
     * Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture}
     * will get notified once the registration was complete and also will get returned.
     *
     * @deprecated Use {@link #register(ChannelPromise)} instead.
     */
    @Deprecated
    ChannelFuture register(Channel channel, ChannelPromise promise);
}

线程的分配

服务于 Channel 的 I/O 和事件的 EventLoop 包含在 EventLoopGroup 中。

异步传输实现只使用了少量的EventLoop(以及和它们相关联的Thread),而且在当前的线程模型中,它们可能会被多个 Channel 所共享。这使得可以通过尽可能少量的 Thread 来 支撑大量的Channel,而不是每个Channel分配一个Thread。EventLoopGroup负责为每个 新创建的 Channel 分配一个 EventLoop。在当前实现中,使用顺序循环(round-robin)的方 式进行分配以获取一个均衡的分布,并且相同的 EventLoop 可能会被分配给多个 Channel。 一旦一个 Channel 被分配给一个 EventLoop,它将在它的整个生命周期中都使用这个EventLoop(以及相关联的 Thread)。

线程管理

在内部,当提交任务到如果(当前)调用线程正是支撑EventLoop的线程,那么所提交 的代码块将会被(直接)执行。否则,EventLoop将调度该任务以便稍后执行,并将它放入 到内部队列中。当EventLoop下次处理它的事件时,它会执行队列中的那些任务/事件。

二、Channel接口

  1. Netty 的 Channel 接口所提供的 API,被用于所有的 I/O 操作(bind、connect、read和 write),它大大地降低了直接使用 Socket 类的复杂性
  2. 由于 Channel 是独一无二的,所以为了保证顺序将 Channel 声明为 java.lang.Comparable 的一个子接口。因此,如果两个不同的 Channel 实例都返回了相同的散列码,那么 AbstractChannel 中的 compareTo()方法的实现将会抛出一个 Error。

Channel 的生命周期状态

  • ChannelUnregistered :Channel 已经被创建,但还未注册到 EventLoop
  • ChannelRegistered :Channel 已经被注册到了 EventLoop
  • ChannelActive :Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接 收和发送数据了
  • ChannelInactive :Channel 没有连接到远程节点

当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler,其可以随后对它们做出响应。在我们的编程中,关注 ChannelActive 和 ChannelInactive 会更多一些。

重要 Channel 的方法

  • eventLoop: 返回分配给 Channel 的 EventLoop
  • pipeline: 返回 Channel 的 ChannelPipeline,也就是说每个 Channel 都有自己的 ChannelPipeline。
  • isActive: 如果 Channel 是活动的,则返回 true。活动的意义可能依赖于底层的传输。 例如,一个 Socket 传输一旦连接到了远程节点便是活动的,而一个 Datagram 传输一旦被 打开便是活动的。
  • localAddress: 返回本地的 SokcetAddress
  • remoteAddress: 返回远程的 SocketAddress
  • write: 将数据写到远程节点,注意,这个写只是写往 Netty 内部的缓存,还没有真正 写往 socket。
  • flush: 将之前已写的数据冲刷到底层 socket 进行传输。
  • writeAndFlush: 一个简便的方法,等同于调用 write()并接着调用 flush()

三、ChannelHandlerContext、ChannelPipeline和ChannelHandler

ChannelPipeline接口

当Channel被创建时,它将会被自动地分配一个新的ChannelPipeline,每个Channel都有自己的ChannelPipeline

ChannelPipeline提供了ChannelHandler链的容器,并定义了用于在该链上传播入站(也就是从网络到业务处理)和出站(也就是从业务处理到网络),ChannelHandler都是放在ChannelPipeline中的

ChannelHandler 的生命周期

在ChannelHandler被添加到ChannelPipeline中或者被从ChannelPipeline中移除时会调用下面这些方法。这些方法中的每一个都接受一个ChannelHandlerContext参数。

  • handlerAdded 当把 ChannelHandler 添加到 ChannelPipeline 中时被调用
  • handlerRemoved 当从 ChannelPipeline 中移除 ChannelHandler 时被调用
  • exceptionCaught 当处理过程中在 ChannelPipeline 中有错误产生时被调用

ChannelPipeline中的ChannelHandler

入站(ChannelInboundHandler)和出站(ChannelOutboundHandler)ChannelHandler被安装到同一个 ChannelPipeline中,ChannelPipeline以双向链表的形式进行维护管理

如果此时有入站事件被触发,就会从ChannelPipeline头部开始流动,到达ChannelPipeline的尾部,中间只会经过定义入站的ChannelHandler;反之,出战就会从ChannelPipeline的尾部到头部,中间中间只会经过定义出站的ChannelHandler

Netty能区分入站事件的 Handler和出站事件的Handler,并确保数据只会在具有相同定向类型的两个ChannelHandler之间传递。

ChannelPipeline上的方法

既然 ChannelPipeline 以双向链表的形式进行维护管理 Handler,自然也提供了对应的方 法在 ChannelPipeline 中增加或者删除、替换 Handler。

  • addFirst、addBefore、addAfter、addLast 将一个 ChannelHandler添加到ChannelPipeline中
  • remove 将一个ChannelHandler从ChannelPipeline中移除
  • replace将ChannelPipeline中的一个ChannelHandler替换为另一个ChannelHandler
  • get 通过类型或者名称返回ChannelHandler
  • context返回和ChannelHandler绑定的ChannelHandlerContext
  • names返回ChannelPipeline中所有ChannelHandler的名称
  • ChannelPipeline的API公开了用于调用入站和出站操作的附加方法。

ChannelHandlerContext

ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之间的关联,每当有ChannelHandler添加到ChannelPipeline中时,都会创建ChannelHandlerContext

ChannelHandlerContext 的主要作用就和LinkedList 内部的类Node类似。

Channel、ChannelPipeline 和 ChannelHandlerContext 上的事件传播

channel.write()、channelpipeline.write()会经过所有出站Handler

channelHandlerContext.write()只会经过下一个出站的Handler

ChannelHandlerContext 的 API

alloc 返回和这个实例相关联的 Channel 所配置的 ByteBufAllocator

bind 绑定到给定的 SocketAddress,并返回 ChannelFuture

channel 返回绑定到这个实例的 Channel

close 关闭 Channel,并返回 ChannelFuture

connect 连接给定的 SocketAddress,并返回 ChannelFuture

deregister 从之前分配的 EventExecutor 注销,并返回 ChannelFuture

disconnect 从远程节点断开,并返回 ChannelFuture executor 返回调度事件的 EventExecutor

fireChannelActive 触发对下一个 ChannelInboundHandler 上的

channelActive()方法(已 连接)的调用

fireChannelInactive 触发对下一个 ChannelInboundHandler 上的channelInactive()方法 (已关闭)的调用

fireChannelRead 触发对下一个 ChannelInboundHandler 上的 channelRead()方法(已接 收的消息)的调用

fireChannelReadComplete 触发对下一个 ChannelInboundHandler 上的 channelReadComplete()方法的调用

fireChannelRegistered 触发对下一个 ChannelInboundHandler 上的 fireChannelRegistered()方法的调用

fireChannelUnregistered 触发对下一个 ChannelInboundHandler 上的 fireChannelUnregistered()方法的调用

fireChannelWritabilityChanged 触发对下一个 ChannelInboundHandler 上的 fireChannelWritabilityChanged()方法的调用

fireExceptionCaught 触发对下一个 ChannelInboundHandler 上的 fireExceptionCaught(Throwable)方法的调用

fireUserEventTriggered 触发对下一个 ChannelInboundHandler 上的 fireUserEventTriggered(Object evt)方法的调用

handler 返回绑定到这个实例的 ChannelHandler

isRemoved 如果所关联的 ChannelHandler 已经被从 ChannelPipeline 中移除则返回 true

name 返回这个实例的唯一名称

pipeline 返回这个实例所关联的 ChannelPipeline

read 将数据从 Channel 读取到第一个入站缓冲区;如果读取成功则触发一个 channelRead 事件,并(在最后一个消息被读取完成后)通知 ChannelInboundHandler 的 channelReadComplete(ctx)方法

write 通过这个实例写入消息并经过 ChannelPipeline

writeAndFlush 通过这个实例写入并冲刷消息并经过 ChannelPipeline

当使用 ChannelHandlerContext 的 API 的时候,有以下两点:

  1. ChannelHandlerContext 和ChannelHandler 之间的关联(绑定)是永远不会改变的, 所以缓存对它的引用是安全的;
  2. 相对于其他类的同名方法,ChannelHandlerContext 的方法将产生更短的事件流,应该 尽可能地利用这个特性来获得最大的性能

入站的handler时经过会读到一个buffer中,如果中间不释放会造成内存泄漏;正常情况netty会处理,但是如果handler执行异常这个buffer就永远不会被netty处理,可以实现SimpleChannelInboundHandler接口重写channelRead0()方法,在此方法处理具体业务逻辑

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1175358.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL索引优化与查询优化

1. 索引失效案例 MySQL中提高性能的一个最有效的方式是对数据表设计合理的索引。索引提供了访问高效数据的方法&#xff0c;并且加快查询的速度&#xff0c;因此索引对查询的速度有着至关重要的影响。 使用索引可以快速地定位表中的某条记录&#xff0c;从而提高数据库查询的速…

C语言查看各数据类型所占大小

编译器&#xff1a;VC2010 #include<stdio.h> int main() {printf("%d\n",sizeof(char));printf("%d\n",sizeof(short));printf("%d\n",sizeof(int));printf("%d\n",sizeof(long));printf("%d\n",sizeof(long long))…

ActiveMq学习⑥__实战篇(Spring 、SpringBoot)

spring 管理ActiveMQ Maven修改&#xff0c;需要添加Spring 支持Jms的pom 包Spring配置文件队列主题在Spring里面实现消费不启动&#xff0c;直接通过配置监听完成 依赖引入 <!--spring整合mq--><!--activemq对JMS的支持&#xff0c;整合SPringle和Activemq-->&l…

圣杯布局/双飞翼布局/flex/grid等,实现CSS三栏自适应布局的几种方法

简介 三栏布局是网页设计中常用的布局&#xff0c;即网页中的内容被分为三块&#xff1a;左侧/中间/右侧。其中两侧部分宽度固定&#xff0c;中间部分宽度自适应的根据浏览器宽度撑满剩余空间。而三栏布局也有很多变形&#xff0c;比如两栏或者N栏布局&#xff0c;上中下三栏布…

【漏洞复现】S2-045 Remote Code Execution(CVE-2017-5638)

感谢互联网提供分享知识与智慧&#xff0c;在法治的社会里&#xff0c;请遵守有关法律法规 文章目录 1.1、漏洞描述1.2、漏洞等级1.3、影响版本1.4、漏洞复现1、基础环境2、漏洞扫描nacs3、漏洞验证 1.5、修复建议 说明内容漏洞编号CVE-2017-5638漏洞名称S2-045 远程代码执行漏…

51单片机-中断

文章目录 前言 前言 #include <reg52.h> #include <intrins.h>sbit key_s2P3^0; sbit flagP3^7;void delay(unsigned int z){unsigned int x,y;for(xz;x>0;x--)for(y114;y>0;y--); }void int_init(){EA1;EX11;IT11;}void main(){int_init();while(1){if (key…

【产品资料】产品经理面试问题(三)

今天和大家免费分享产品经理常见的面试题目&#xff0c;含回答思路分析和回答事例。 【资源下载】 这个资源可以在Axure高保真原型哦小程序里免费下载 打开下方小程序后&#xff0c;搜索产品经理面试题目&#xff0c;获取下载地址 更多原型模板、视频教程、产品文档、定制服…

c++11中的线程库和包装器

c11 1. 线程库1.1 线程库1.2 锁mutex 2. 包装器2.1 funciton2.2 bind 1. 线程库 1.1 线程库 C11中的线程库提供了一种方便的方式来创建和管理线程。其中&#xff0c;std::thread是一个重要的类&#xff0c;它允许我们创建新线程并控制它们的执行。以下是std::thread的一些重要…

磁盘物理结构介绍(磁头,扇区),chs寻址,如何读写,磁盘io消耗时间;线性抽象结构,lba寻址,分区引入

目录 磁盘文件 引入 看待角度 磁盘 介绍 物理结构 俯视图 立体图 磁头 扇区 如何找到一个扇区 -- CHS寻址 如何读写 磁盘io消耗时间 抽象结构 -- 线性 引入 介绍 -- LBA寻址 分区 引入 介绍 磁盘文件 引入 文件分为两种 被打开的文件(主要讨论与进程之间的…

python基础速通

1. 梳理&#xff1a;目前学习了哪几种数据类型&#xff0c; 每一个数据类型定义一个变量&#xff0c;并输出内容以及类型 # 数据类型 # 整型 int_data 1 print(int_data, type(int_data)) # 浮点型 float_data 1.2 print((float_data, type(float_data))) # 复数 complex_da…

计算机毕设 基于大数据的社交平台数据爬虫舆情分析可视化系统

文章目录 0 前言1 课题背景2 实现效果**实现功能****可视化统计****web模块界面展示**3 LDA模型 4 情感分析方法**预处理**特征提取特征选择分类器选择实验 5 部分核心代码6 最后 0 前言 &#x1f525; 这两年开始毕业设计和毕业答辩的要求和难度不断提升&#xff0c;传统的毕…

Monarch Mixer:一种性能比Transformer更强的网络架构

六年前&#xff0c;谷歌团队在arXiv上发表了革命性的论文《Attention is all you need》。作为一种优势的机器学习网络架构&#xff0c;Transformer技术迅速席卷全球。Transformer一直是现代基础模型背后的主力架构&#xff0c;并且在不同的应用程序中取得了令人印象深刻的成功…

[云原生1. ] Docker consul的详细介绍(容器服务的更新与发现)

文章目录 1. 服务注册与发现的概述1.1 cmp问题1.2 解决方法 2. Consul的概述2.1 简介2.2 为什么要使用Consul服务模块2.2 Consul的服务架构2.3 Consul的一些关键特性 3. consul服务部署3.1 前置准备3.2 Consul服务器3.2.1 建立 Consul 服务3.2.2 设置代理&#xff0c;在后台启动…

Linux开发工具的使用(vim、gcc/g++)

文章目录 vimvim基本概念vim的常用三种模式vim三种模式的相互转换vim命令模式下的命令集移动光标删除文字剪切/删除复制替换撤销和恢复跳转至指定行 vim底行模式下的命令集 gcc/ggcc/g的作用gcc/g的语法预处理编译汇编链接函数库动静态库动态链接的优缺点 静态链接的优缺点 vim…

注意,注意,weak_ptr有坑

class Test { public:Test(){cout << "构造函数\n";}~Test(){cout << "析构函数\n";} }; void *operator new(size_t nsize) {void *ptmp std::malloc(nsize);printf("申请内存:%d,%p\n",nsize, ptmp);return ptmp; }void operator…

【油猴脚本】学习笔记

目录 新建用户脚本模板源注释 测试代码获取图标 Tampermonkey v4.19.0 原教程&#xff1a;手写油猴脚本&#xff0c;几分钟学会新技能——王子周棋洛   Tampermonkey首页   面向 Web 开发者的文档   Greasy Fork 新建用户脚本 打开【管理面板】 点击【】&#xff0c;即…

微服务使用指南

微服务使用指南 1.初识微服务 微服务可以认为是一种分布式架构的解决方案&#xff0c;提供服务的独立性和完整性&#xff0c;做到服务的高内聚、低耦合。 目前服务架构主要包含&#xff1a;单体架构和分布式架构。 1.1 单体架构 单体架构&#xff1a;把所有业务功能模块都…

YoloV8目标检测与实例分割——目标检测onnx模型推理

一、模型转换 1.onnxruntime ONNX Runtime&#xff08;ONNX Runtime或ORT&#xff09;是一个开源的高性能推理引擎&#xff0c;用于部署和运行机器学习模型。它的设计目标是优化执行使用Open Neural Network Exchange&#xff08;ONNX&#xff09;格式定义的模型&#xff0c;…

微信怎么批量保存大量照片

8-2 本文要解决的问题是自动或者快速地保存微信收到的图片的事情&#xff0c;如果你的工作中有一个事情是需要每天或者经常保存大量的从微信收到的图片或者视频的&#xff0c;也许本文适合你&#xff0c;本文介绍的方法&#xff0c;可以自动保存各个群或者人发来的图片和视频。…

【LeetCode每日一题合集】2023.9.18-2023.9.24(⭐拓扑排序⭐设计数据结构:LRU缓存实现 LinkedHashMap⭐)

文章目录 337. 打家劫舍 III&#xff08;树形DP&#xff09;2560. 打家劫舍 IV&#xff08;二分查找动态规划&#xff09;LCP 06. 拿硬币&#xff08;简单贪心模拟&#xff09;2603. 收集树中金币⭐思路——拓扑排序删边 2591. 将钱分给最多的儿童&#xff08;分类讨论&#xf…