探索Netty的FuturePromise

news2025/1/11 0:04:16

Netty—Future&Promise

  • 一、JDK原生 Future
  • 二、Netty包下的 Future
  • 三、Promise
    • 1、使用Promise同步获取结果
    • 2、使用Promise异步获取结果
    • .3、使用Promise同步获取异常 - sync & get
    • 4、使用Promise同步获取异常 - await
    • 5、使用Promise异步获取异常

在异步处理时,经常用到这两个接口

首先要说明 netty 中的 Future 与 jdk 中的 Future 同名,但是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展。

在这里插入图片描述

  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果;
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束;
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。
功能/名称jdk Futurenetty FuturePromise
cancel取消任务--
isCanceled任务是否取消--
isDone任务是否完成,不能区分成功失败--
get获取任务结果,阻塞等待--
getNow-获取任务结果,非阻塞,还未产生结果时返回 null-
await-等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断-
sync-等待任务结束,如果任务失败,抛出异常-
isSuccess-判断任务是否成功-
isCancellable-是否可以取消执行
cause-获取失败信息,非阻塞,如果没有失败,返回null-
addLinstener-添加回调,异步接收结果-
removeListener-删除回调,异步接收结果
setSuccess--设置成功结果
setFailure--设置失败结果

一、JDK原生 Future

关于 java.util.concurrent包下的Future 接口,我想大家应该都很熟悉,用得最多的就是在使用 Java 的线程池 ThreadPoolExecutor 的时候了。在 submit 一个任务到线程池中的时候,返回的就是一个 Future 实例,通过它来获取提交的任务的执行状态和最终的执行结果,我们最常用它的 isDone()get() 方法。

// 尝试取消执行此任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否在正常执行完成之前取消
boolean isCancelled();
// 任务是否完成,完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法都将返回true
boolean isDone();
// 阻塞获取执行结果
V get() throws InterruptedException, ExecutionException;
// 阻塞获取执行结果,指定超时时间
V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

接下来,演示一下使用jdk原生Future获取执行结果~

@Slf4j
public class JdkFutureTest01 {
    public static void main(String[] args) {
        // 线程池
        ExecutorService service = newFixedThreadPool(2);
        // 提交任务
        Future<Object> future = service.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                log.info("执行计算");
                Thread.sleep(1000);
                return 50;
            }
        });
        try {
            System.out.println(future.get());
            service.shutdownNow();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }
}

二、Netty包下的 Future

原生的Future功能比较有限,Netty扩展了Future并增加了以下方法:

// 判断任务是否成功
boolean isSuccess();
// 判断是否可以取消执行
boolean isCancellable();
// 获取失败的信息
Throwable cause();
// 添加回调方法,异步接收结果
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
// 添加多个回调方法
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 删除回调方法,异步接收结果
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// 删除多个回调方法
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 等待任务结束,如果任务失败,抛出异常
Future<V> sync() throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
Future<V> syncUninterruptibly();
// 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断
Future<V> await() throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
Future<V> awaitUninterruptibly();
// 等待该future在指定的时间限制内完成。
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
// 等待该future在指定的时间限制内完成。
boolean await(long timeoutMillis) throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
// 同上,区别是不可中断阻塞等待过程
boolean awaitUninterruptibly(long timeoutMillis);
// 获取任务结果,非阻塞,还未产生结果时返回 null
V getNow();

通过以上扩展的方法我们可以发现,Netty的Future增加了 sync()await() 方法用于阻塞等待,还提供了 addListener() 方法用于添加回调方法,异步接收结果。

sync() 方法内部会先调用 await() 方法,等待 await() 方法返回后,会检查该任务是否失败,如果失败则将失败的异常抛出来。即使用await()方法等待任务结束,如果任务失败,不会抛异常,而是需要通过 isSuccess 判断。然而 sync() 方法是直接抛出异常!

@Override
public Promise<V> sync() throws InterruptedException {
    await();
    rethrowIfFailed();
    return this;
}
private void rethrowIfFailed() {
  Throwable cause = cause();
  if (cause == null) {
    return;
  }

  PlatformDependent.throwException(cause);
}

接下来,演示一下使用Netty包下的Future获取执行结果~

@Slf4j
public class NettyFutureTest01 {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        EventLoop eventLoop = eventLoopGroup.next();
        Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.info("执行计算");
                Thread.sleep(1000);
                return 66;
            }
        });
      	// 阻塞等待
        future.sync();
        log.info("收到结果{}", future.getNow());
        eventLoopGroup.shutdownGracefully();
    }
}

又或者使用 addListener() 方法用于添加回调方法,异步接收结果。

future.addListener(new GenericFutureListener<Future<? super Integer>>() {
    @Override
    public void operationComplete(Future<? super Integer> future) throws Exception {
        log.info("收到结果{}", future.getNow());
        eventLoopGroup.shutdownGracefully();
    }
});

三、Promise

Future支持阻塞等待、添加回调方法、判断执行状态等,而Promise主要是支持状态设置相关方法。当底层I/O操作通过Promise改变执行状态,我们可以通过同步等待的Future立即得到结果。

// 设置成功结果并回调
Promise<V> setSuccess(V result);
// 同上,区别是是否报错
boolean trySuccess(V result);
// 设置失败异常并回调
Promise<V> setFailure(Throwable cause);
// 同上,区别是是否报错
boolean tryFailure(Throwable cause);
// 设置为不可取消状态
boolean setUncancellable();

可见,Promise作为一个特殊的Future,只是增加了一些状态设置方法。所以它常用于传入I/O业务代码中,用于I/O结束后设置成功(或失败)状态,并回调方法。以下是DefaultPromise的继承关系:
在这里插入图片描述

设置promise的状态其实就是原子地修改result字段为传入的执行结果。值得注意的是,result字段带有volatile关键字来确保多线程之间的可见性。另外,设置完毕状态后,会尝试唤醒所有在阻塞等待该promise返回结果的线程。

// result 字段的原子更新器
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
  AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
// 缓存执行结果的字段
private volatile Object result;
// Promise所在的线程
private final EventExecutor executor;
// 一个或多个回调方法
private Object listeners;
// 阻塞线程数量计数器
private short waiters;

@Override
public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
	return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
  // 原子修改result字段为 objResult
  if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
      RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
    if (checkNotifyWaiters()) {
      notifyListeners();
    }
    return true;
  }
  return false;
}
private synchronized boolean checkNotifyWaiters() {
  if (waiters > 0) {
    // 唤醒其他等待线程
    notifyAll();
  }
  return listeners != null;
}

1、使用Promise同步获取结果

@Slf4j
public class PromiseDemo01 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        DefaultEventLoop eventLoop = new DefaultEventLoop();
        Promise<Integer> promise = new DefaultPromise<>(eventLoop);

        eventLoop.execute(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("set success");
            promise.setSuccess(10);
        });

        log.info("start...");
        log.info("promise.getNow():{}" , promise.getNow());
        log.info("promise.get():{}" , promise.get());
    }
}

2、使用Promise异步获取结果

@Slf4j
public class PromiseDemo03 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        DefaultEventLoop eventLoop = new DefaultEventLoop();
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

        eventLoop.execute(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            RuntimeException exception = new RuntimeException("error....hh");
            log.debug("set failure,e: {}", exception.getMessage());
            promise.setFailure(exception);
        });

        log.info("start");
        log.info("promise.getNow():{}" , promise.getNow());
        log.info("promise.get():{}" , promise.get());
    }
}

.3、使用Promise同步获取异常 - sync & get

Slf4j
public class PromiseDemo03 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        DefaultEventLoop eventLoop = new DefaultEventLoop();
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

        eventLoop.execute(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            RuntimeException exception = new RuntimeException("error....hh");
            log.debug("set failure,e: {}", exception.getMessage());
            promise.setFailure(exception);
        });

        log.info("start");
        log.info("promise.getNow():{}" , promise.getNow());
        log.info("promise.get():{}" , promise.get());
    }
}

4、使用Promise同步获取异常 - await

@Slf4j
public class PromiseDemo04 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        DefaultEventLoop eventLoop = new DefaultEventLoop();
        Promise<Integer> promise = new DefaultPromise<>(eventLoop);

        eventLoop.execute(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            RuntimeException exception = new RuntimeException("error....hh");
            log.info("set failure,e: {}", exception.getMessage());
            promise.setFailure(exception);
        });

        log.info("start");
        log.info("promise.getNow():{}" , promise.getNow());
        promise.await();
        if (promise.isSuccess()) {
            log.info("{}", promise.getNow());
        } else {
            log.error("{}", promise.cause().toString());
        }
    }
}

5、使用Promise异步获取异常

@Slf4j
public class PromiseDemo05 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        DefaultEventLoop eventLoop = new DefaultEventLoop();
        Promise<Integer> promise = new DefaultPromise<>(eventLoop);
        promise.addListener(future -> {
            if (promise.isSuccess()) {
                log.info("{}", promise.getNow());
            } else {
                log.error("{}", promise.cause().toString());
            }
        });

        eventLoop.execute(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            RuntimeException exception = new RuntimeException("error....hh");
            log.info("set failure,e: {}", exception.getMessage());
            promise.setFailure(exception);
        });

        log.info("start");
        log.info("promise.getNow():{}" , promise.getNow());

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1001594.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

颜色代码对照表

颜色代码对照表 各颜色代码: 1 白色 #FFFFFF 2 红色 #FF0000 3 绿色 #00FF00 4 蓝色 #0000FF 5 牡丹红 #FF00FF 6 青色 #00FFFF 7 黄色 #FFFF00 8 黑色 #000000 9 海蓝 #70DB93 …

出行类APP商业化路径解决方案

当下市场主流的商业化路径和方法相比于之前区别不大&#xff0c;开发者们都是在现有商业化体系下&#xff0c;制定更加详细、优质的策略&#xff0c;以期获得更高利益。 出行类App用户结构分析 年龄层次&#xff1a;出行类App用户的年龄分布比较广泛&#xff0c;主要集中在20…

商品防伪查询溯源小程序开发源码

企业商家在销售商品的过程中被人仿冒产品以次充好损害公司名声和影响产品销量是很多企业的痛点。我们基于次帮助企业开发了一款商品防伪溯源查询小程序&#xff0c;解决企业痛点&#xff0c;酿造更好的经商坏境。 核心功能&#xff1a; 我们采用一物一码的逻辑&#xff0c;可…

【网络基础】——HTTPS

目录 HTTPS背景知识 HTTPS是什么&#xff1f; 加密解密 为什么要加密 常见的加密方式 对称加密 非对称加密 数据摘要&&数据指纹 数字签名 HTTPS工作过程探究 方案1&#xff1a;只使用对称加密 方案2&#xff1a;只使用非对称加密 方案3&#xff1a;双方…

day37 线程

一、线程安全 二、多线程并发的安全问题 当多个线程并发操作同一临界资源 由于线程切换实际不确定 导致操作顺序出现混乱 产生的程序bug 严重时出现系统瘫痪 临界资源 &#xff1a;操作该资源的完整流程同一时间只能被单一线程操作的资源 多线程并发会出现的各种问题、 如…

IT运维:使用数据分析平台监控奇安信

监控目标 本文基于鸿鹄2.10.0版本。 ●监控奇安信日志类型分布 ●监控奇安信攻击行为、分析攻击类型 ●监控奇安信攻击来源情况 操作步骤 数据导入 1、创建数据集&#xff0c;如使用已经存在的数据集&#xff0c;可跳过此步骤 数据集名称&#xff1a;qax_syslog&#xff08;仪表…

管理类联考——数学——汇总篇——知识点突破——应用题——鸡兔同笼

⛲️ 一、考点讲解 第一鸡兔同笼问题 若已知脚数之和及鸡兔总数&#xff0c;求各多少只&#xff1a; 假设全都是鸡&#xff0c;则有 兔数 ( 实际脚数 − 2 鸡兔总数 ) ( 4 − 2 ) 兔数(实际脚数-2鸡兔总数)(4-2) 兔数(实际脚数−2鸡兔总数)(4−2) 假设全都是兔&#xff0…

Python 基础知识:语法、数据类型和控制结构

推荐&#xff1a;使用 NSDT场景编辑器 快速搭建3D应用场景 如果开发和环境中已安装 Python&#xff0c;请启动 Python REPL 和代码。或者&#xff0c;如果您想跳过安装并立即开始编码&#xff0c;我建议您前往Google Colab并一起编码。 你好&#xff0c;Python&#xff01; 在…

【算法日志】单调栈: 单调栈简介及其应用

代码随想录刷题60Day 目录 单调栈简介 单调栈的应用 下次更高温 下一个更大元素1 下一个更大元素2 接雨水 柱状图中最大矩形 单调栈简介 单调栈&#xff08;Monotonic Stack&#xff09;是一种特殊的栈数据结构&#xff0c;它满足元素的单调性&#xff0c;这种单调性需…

vue3使用el-form实现登录、注册功能,且进行表单验证(Element Plus中的el-form)

简介&#xff1a;Element Plus 中的 el-form 是一个表单组件&#xff0c;用于快速构建表单并进行数据校验。它提供了丰富的表单元素和验证规则&#xff0c;使表单开发变得更加简单和高效。可以搭配el-dialog实现当前页面的登录、注册页 &#xff0c;这两天在vue3中用到了表单登…

微信小程序 按钮颜色

<button type"primary">主要按钮样式类型</button> <button type"default">默认按钮样式类型</button> <button type"warn">警告按钮样式类型</button> <view>按钮plain是否镂空</view> <bu…

全局光照RSM

Reflective Shadow Maps&#xff08;RSM&#xff09; 一切被直接光照照到的物体&#xff0c;会作为次级光源。 问题1&#xff1a;哪些面片被直接照亮 使用ShadowMap就可以知道哪些面片被直接照亮 问题2&#xff1a;各个面片对P点的贡献分别是多少。 对渲染方程代入如上计算…

CRM客户管理系统能在线使用吗?

经常有朋友私信小编&#xff0c;mac系统/windows系统可以下载使用吗&#xff1f;在哪里下载&#xff1f;其实&#xff0c;Zoho CRM在线即可使用&#xff0c;完全不用下载客户端&#xff01;下面我们就说说&#xff0c;CRM管理系统如何在线用。 Zoho CRM是一款帮助企业进行客户…

恒运资本:什么是五日线?

在股票买卖中&#xff0c;五日线是一种常见的技术剖析指标&#xff0c;用于判别股市的趋势和价格改变。五日线也被称为移动平均线&#xff0c;它是以最近五个买卖日的收盘价为基础核算出的平均值。五日线能够协助出资者识别短期商场意向&#xff0c;及时调整出资战略&#xff0…

Mybatis 框架 ( 四 ) QueryWrapper

4.5.Wrapper条件构造器 Wrapper &#xff1a; 条件构造抽象类&#xff0c;最顶端父类 AbstractWrapper &#xff1a; 用于查询条件封装&#xff0c;生成 sql 的 where 条件 QueryWrapper &#xff1a; Entity 对象封装操作类&#xff0c;不是用lambda语法 UpdateWrapper &am…

【前端】禁止别人调试自己的前端页面代码

无限debugger 前端页面防止调试的方法主要是通过不断 debugger 来疯狂输出断点&#xff0c;因为 debugger 在控制台被打开的时候就会执行由于程序被 debugger 阻止&#xff0c;所以无法进行断点调试&#xff0c;所以网页的请求也是看不到的代码如下&#xff1a; /** * 基础禁止…

参照错误原因排查

报错如下&#xff0c;弹框参照框不显示&#xff1a; 首先定位到错误代码行&#xff0c;排查后发现调用方式、参数均没问题 再到参照引入源头&#xff0c;结合network检测资源加载情况&#xff0c;发现静态资源js文件均正常获取 可以基本判断是参照组件那边出现问题&#xff0c…

算法通关村第十八关——回溯是怎么回事(青铜)

算法通关村第十八关——回溯是怎么回事&#xff08;青铜&#xff09; 前言1. 从N叉树说起1.1 N叉树的定义和特点1.2 N叉树的遍历方式1.3 N叉树在回溯算法中的应用 2. 为什么有的问题暴力搜索也不行2.1 暴力搜索的局限性 3. 回溯递归局部枚举放下前任3.1 回溯算法的基本思想和原…

运维Shell脚本牛刀小试(九): 重定向操作符“>“及双重定向“>>“

运维Shell脚本小试牛刀(一) 运维Shell脚本小试牛刀(二) 运维Shell脚本小试牛刀(三)::$(cd $(dirname $0)&#xff1b; pwd)命令详解 运维Shell脚本小试牛刀(四): 多层嵌套if...elif...elif....else fi_蜗牛杨哥的博客-CSDN博客 Cenos7安装小火车程序动画 运维Shell脚本小试…

c高级day4(9.11)shell脚本(case ....in语句,循环语句,select ...in和case...In结合,辅助控制关键字,函数)

1.实现一个对数组就和的函数&#xff0c;数组通过实参传递给函数 2.写一个函数&#xff0c;输出当前用户的uid和gid&#xff0c;并使用变量接收结果 #!/bin/bash read -a arr sum0 function add() { …