Netty学习——源码篇9 Handler其他处理与异步处理

news2025/1/16 10:56:58

1 ChannelHandlerContext

        每个ChannelHandler被添加到ChannelPipeline后,都会创建一个ChannelHandlerContext,并与ChannelHandler关联绑定。ChannelHandlerContext允许ChannelHandler与其他的ChannelHandler进行交互。ChannelHandlerContext不会改变添加到其中的ChannelHandler,因此它是安全的。ChannelHandlerContext、ChannelHandler、ChannelPipeline的关系如下图:

2 Channel的声明周期

        Netty有一个简单但强大的状态模型,能完美映射到ChannelInboundHandler的各个方法。如下表所示是Channel生命周期四个不同的状态。

         一个Channel正常的生命周期如下图所示。随着状态发生变化产生相应的事件。这些事件被转发到ChannelPipeline中的ChannelHandler来触发相应的操作。

3 ChannelHandler常用的API

        先看一个Netty中整个Handler体系的类关系图。

        Netty定义了良好的类型层次结构来表示不同的处理程序类型,所有类型的父类是ChannelHandler, ChannelHandler提供了在其生命周期内添加或从ChannelPipeline中删除的方法,如下表

        Netty还提供了一个实现了ChannelHandler的抽象类ChannelHandlerAdapter。 ChannelHandlerAdapter实现了父类的所有方法,主要功能就是将请求从一个ChannelHandler往下传递到下一个ChannelHandler,直到全部ChannelHandler传递完毕。也可以直接继承于ChannelHandlerAdapter,然后重写里面的方法。

4 ChannelInboundHandler

        ChannelInboundHandler还提供了一些在接收数据或Channel状态改变时被调用的方法。下面是ChannelInboundHandler的一些方法。

5 异步处理Future

        java.util.concurrent.Future是Java原生API中提供的接口,用来记录异步执行的状态,Future的get方法会判断任务是否执行完成,如果完成立即返回执行结果,否则阻塞线程,知道任务完成再返回。

        Netty扩展了Java的Future,在Future的基础上扩展了监听器(Listener)接口,通过监听器可以让异步执行更加有效率,不需要通过调用get方法来等待异步执行结束,而是通过监听器回调来精确控制异步执行结束时间。

public interface Future<V> extends java.util.concurrent.Future<V> {

    boolean isSuccess();

    boolean isCancellable();


    Throwable cause();

    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);


    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    Future<V> sync() throws InterruptedException;

    
    Future<V> syncUninterruptibly();

    
    Future<V> await() throws InterruptedException;

 
    Future<V> awaitUninterruptibly();

    boolean await(long timeout, TimeUnit unit) throws InterruptedException;

  
    boolean await(long timeoutMillis) throws InterruptedException;

   
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);

   
    boolean awaitUninterruptibly(long timeoutMillis);


    V getNow();

    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

        ChannelFuture接口有扩展了Netty的Future接口,表示一种没有返回值的异步调用,同时和一个Channel进行绑定。

public interface ChannelFuture extends Future<Void> {

    /**
     * Returns a channel where the I/O operation associated with this
     * future takes place.
     */
    Channel channel();

    @Override
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelFuture sync() throws InterruptedException;

    @Override
    ChannelFuture syncUninterruptibly();

    @Override
    ChannelFuture await() throws InterruptedException;

    @Override
    ChannelFuture awaitUninterruptibly();

  
    boolean isVoid();
}

6 异步执行Promise

        Promise接口也是Future的扩展接口,它表示一种可写的Future,可以自定义设置异步执行的结果。


/**
 * Special {@link Future} which is writable.
 */
public interface Promise<V> extends Future<V> {

    
    Promise<V> setSuccess(V result);

 
    boolean trySuccess(V result);

    /**
     * Marks this future as a failure and notifies all
     * listeners.
     *
     * If it is success or failed already it will throw an {@link IllegalStateException}.
     */
    Promise<V> setFailure(Throwable cause);

    /**
     * Marks this future as a failure and notifies all
     * listeners.
     *
     * @return {@code true} if and only if successfully marked this future as
     *         a failure. Otherwise {@code false} because this future is
     *         already marked as either a success or a failure.
     */
    boolean tryFailure(Throwable cause);

    /**
     * Make this future impossible to cancel.
     *
     * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done
     *         without being cancelled.  {@code false} if this future has been cancelled already.
     */
    boolean setUncancellable();

    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> await() throws InterruptedException;

    @Override
    Promise<V> awaitUninterruptibly();

    @Override
    Promise<V> sync() throws InterruptedException;

    @Override
    Promise<V> syncUninterruptibly();
}

        ChannelPromise接口扩展了Promise和ChannelFuture,绑定了Channel,既可以写异步执行结果,又具备了监听者的功能,是Netty实际编程中使用的表示异步执行的接口。

public interface ChannelPromise extends ChannelFuture, Promise<Void> {

    @Override
    Channel channel();

    @Override
    ChannelPromise setSuccess(Void result);

    ChannelPromise setSuccess();

    boolean trySuccess();

    @Override
    ChannelPromise setFailure(Throwable cause);

    @Override
    ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelPromise sync() throws InterruptedException;

    @Override
    ChannelPromise syncUninterruptibly();

    @Override
    ChannelPromise await() throws InterruptedException;

    @Override
    ChannelPromise awaitUninterruptibly();

    /**
     * Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
     */
    ChannelPromise unvoid();
}

        DefaultChannelPromise是ChannelPromise的实现类,它是实际运行时的Promise实例。Netty使用addListener方法来回调异步执行的结果。DefaultPromise的addListener()方法的代码如下

    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        checkNotNull(listener, "listener");

        synchronized (this) {
            addListener0(listener);
        }

        if (isDone()) {
            notifyListeners();
        }

        return this;
    }
    private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
        if (listeners == null) {
            listeners = listener;
        } else if (listeners instanceof DefaultFutureListeners) {
            ((DefaultFutureListeners) listeners).add(listener);
        } else {
            listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
        }
    }
    private void notifyListeners() {
        EventExecutor executor = executor();
        if (executor.inEventLoop()) {
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
            final int stackDepth = threadLocals.futureListenerStackDepth();
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                try {
                    notifyListenersNow();
                } finally {
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }

        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListenersNow();
            }
        });
    }

        从上述代码中可以看到,DefaultChannelPromise会判断异步任务执行的状态,如果执行完毕就立即通知监听者,否则加入监听者队列。通知监听者就是找一个线程来执行调用监听者的回调函数。

        再来看监听者的接口,其实就是一个方法,即等待异步任务执行完毕后,获得Future结果,执行回调的逻辑,代码如下。

public interface GenericFutureListener<F extends Future<?>> extends EventListener {

    /**
     * Invoked when the operation associated with the {@link Future} has been completed.
     *
     * @param future  the source {@link Future} which called this callback
     */
    void operationComplete(F future) throws Exception;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1560371.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

14-项目管理:如何编写高质量的Makefile?

下面给你举个例子&#xff0c;你就会理解低质量的Makefile文件是什么样的了。 build: clean vetmkdir -p ./Roleexport GOOSlinux && go build -v .vet:go vet ./...fmt:go fmt ./...clean:rm -rf dashboard上面这个Makefile存在不少问题。例如&#xff1a;功能简单&a…

基于springboot实现企业客户管理系统项目【项目源码+论文说明】

基于springboot实现企业客户管理系统演示 摘要 本论文主要论述了如何使用JAVA语言开发一个企业客户管理系统&#xff0c;本系统将严格按照软件开发流程进行各个阶段的工作&#xff0c;采用B/S架构&#xff0c;面向对象编程思想进行项目开发。在引言中&#xff0c;作者将论述企…

信息化项目数据质量管理

数据质量管理定义&#xff1a; 对数据从计划、获取、存储、共享、维护、应用、消亡生命周期的每个阶段里可能引发的各类数据质量问题&#xff0c;进行识别、监控、预警、处理等一系列管理活动&#xff0c;并通过改善和提高管理水平使得数据质量获 得进一步提高。 2术语和定义 2…

绝地求生:300万在线已是过去的荣耀和成功,未来之路莫让反作弊绊脚!

PUBG七周年庆典活动已过去两周时间&#xff0c;相比较而言&#xff0c;活动还是比较给力的&#xff0c;大量的黑货票券、G-Coin让很多白嫖党玩家白嫖到了成长型武器、2024生存通行证等高质量皮肤道具&#xff0c;回流和新手玩家大量涌入&#xff0c;游戏热度一度回到Steam前二&…

CCIE-11-IPSec_VPN

目录 实验条件网络拓朴实验目的 开始配置1. R2 Ping R3确定基础网络是通的2. 配置R23. 配置R34. 测试 实验条件 网络拓朴 实验目的 为PC1和PC2建立IPSec VPN PC1可以ping通PC2 开始配置 1. R2 Ping R3确定基础网络是通的 R2#show ip int br Interface IP…

【题解】—— LeetCode一周小结13

【题解】—— 每日一道题目栏 上接&#xff1a;【题解】—— LeetCode一周小结12 25.零钱兑换 II 题目链接&#xff1a;518. 零钱兑换 II 给你一个整数数组 coins 表示不同面额的硬币&#xff0c;另给一个整数 amount 表示总金额。 请你计算并返回可以凑成总金额的硬币组合…

【产品经理】全面解读“数字孪生”

理解数字孪生 随着互联网技术的深入发展&#xff0c;数字孪生被越来越多地提及&#xff0c;那么数字孪生到底是什么&#xff1f;数字孪生&#xff0c;翻译自英文“Digital Twin”&#xff0c;最早在2002年&#xff0c;被从事产品生命周期管理PLM的Michael Grieves教授&#xf…

护眼台灯哪个牌子好?性价比高的护眼台灯推荐

现在生活节奏越来越快&#xff0c;夜间学习、工作已经成为了很多学生党、办公族不可避免的一件事&#xff0c;很多人在劣质的光源下眼睛会出现各种问题。尤其是桌前的那一盏台灯&#xff0c;很多人认为台灯亮度只要够亮就不会伤眼了。 其实不然&#xff0c;要知道光线中的成分…

vant checkbox 复选框 样式改写

修改前 修改后 基于 vant&#xff1a; 4.8.3 unocss: 0.53.4 <van-checkbox-group v-model"query.zczb" shape"square" class"text-16 w-100% flex flex-wrap"><template v-for"item in registerCapitalOption"><v…

AI绘画教程:Midjourney使用方法与技巧从入门到精通

文章目录 一、《AI绘画教程&#xff1a;Midjourney使用方法与技巧从入门到精通》二、内容介绍三、作者介绍&#x1f324;️粉丝福利 一、《AI绘画教程&#xff1a;Midjourney使用方法与技巧从入门到精通》 一本书读懂Midjourney绘画&#xff0c;让创意更简单&#xff0c;让设计…

WPF上使用MaterialDesign框架---下载与配置

一、介绍&#xff1a; Material Design语言的一些重要功能包括 系统字体Roboto的升级版本 &#xff0c;同时颜色更鲜艳&#xff0c;动画效果更突出。杜拉特还简要谈到了新框架的一些变化。谷歌的想法是让谷歌平台上的开发者掌握这个新框架&#xff0c;从而让所有应用就有统一的…

二.音视频编辑-媒体组合-播放

引言 当涉及到音视频编辑时&#xff0c;媒体资源的提取和组合是至关重要的环节。在iOS平台上&#xff0c;AVFoundation框架提供了丰富而强大的功能&#xff0c;使得媒体资源的操作变得轻松而高效。从原始的媒体中提取片段&#xff0c;然后将它们巧妙地组合成一个完整的作品&am…

Adaboost集成学习 | Matlab实现基于SVM-Adaboost支持向量机结合Adaboost集成学习时间序列预测(股票价格预测)

目录 效果一览基本介绍模型设计程序设计参考资料效果一览 基本介绍 Adaboost集成学习 | 基于SVM-Adaboost支持向量机结合Adaboost集成学习时间序列预测(股票价格预测)基于SVM(支持向量机)和AdaBoost集成学习的时间序列预测(如股票价格预测)是一种结合了两种强大机器学习算…

2_2.Linux中的远程登录服务

# 一.Openssh的功能 # 1.sshd服务的用途# #作用&#xff1a;可以实现通过网络在远程主机中开启安全shell的操作 Secure SHell >ssh ##客户端 Secure SHell daemon >sshd ##服务端 2.安装包# openssh-server 3.主配置文件# /etc/ssh/sshd_conf 4.…

浏览器工作原理与实践--编译器和解释器:V8是如何执行一段JavaScript代码的

前面我们已经花了很多篇幅来介绍JavaScript是如何工作的&#xff0c;了解这些内容能帮助你从底层理解JavaScript的工作机制&#xff0c;从而能帮助你更好地理解和应用JavaScript。 今天这篇文章我们就继续“向下”分析&#xff0c;站在JavaScript引擎V8的视角&#xff0c;来分析…

ROS2从入门到精通1-2:详解ROS2服务通信机制与自定义服务

目录 0 专栏介绍1 服务通信模型2 服务模型实现(C)3 服务模型实现(Python)4 自定义服务5 话题、服务通信的异同 0 专栏介绍 本专栏旨在通过对ROS2的系统学习&#xff0c;掌握ROS2底层基本分布式原理&#xff0c;并具有机器人建模和应用ROS2进行实际项目的开发和调试的工程能力。…

蓝桥备赛——贪心

题干 AC Code n, w = map(int, input().split()) # n种类, w核载重 a = [] # [[weight1, value1], [weight2, value2], ...] for _ in range(n):a.append(list(map(int, input().split()))) a.sort(key=lambda x: x[1] / x[0], reverse=True)maxVal = 0for i in a:if i[0…

可视化图表:K线图,快速搞清价格波动。

2023-08-21 21:20贝格前端工场 Hi&#xff0c;我是贝格前端工场的老司机&#xff0c;本文分享可视化图表设计的K线图设计&#xff0c;欢迎老铁持续关注我们。 一、K线图的含义 K线图&#xff08;K Line Chart&#xff09;是一种常用于股票、期货等金融市场的可视化图表&…

【详细讲解WebView的使用与后退键处理】

&#x1f3a5;博主&#xff1a;程序员不想YY啊 &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 ✨希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出…

前端对数据进行分组和计数处理

js对数组数据的处理&#xff0c;添加属性&#xff0c;合并表格数据。 let data[{id:1,group_id:111},{id:2,group_id:111},{id:3,group_id:111},{id:4,group_id:222},{id:5,group_id:222} ]let tempDatadata; tempDatatempData.reduce((arr,item)>{let findarr.find(i>i…