Netty—FuturePromise

news2024/11/23 12:41:45

Netty—Future&Promise

  • 一、JDK原生 Future
  • 二、Netty包下的 Future
  • 三、Promise
    • 1、使用Promise同步获取结果
    • 2、使用Promise异步获取结果
    • .3、使用Promise同步获取异常 - sync & get
    • 4、使用Promise同步获取异常 - await
    • 5、使用Promise异步获取异常

在异步处理时,经常用到这两个接口

首先要说明 netty 中的 Future 与 jdk 中的 Future 同名,但是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展。

在这里插入图片描述

  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果;
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束;
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。
功能/名称jdk Futurenetty FuturePromise
cancel取消任务--
isCanceled任务是否取消--
isDone任务是否完成,不能区分成功失败--
get获取任务结果,阻塞等待--
getNow-获取任务结果,非阻塞,还未产生结果时返回 null-
await-等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断-
sync-等待任务结束,如果任务失败,抛出异常-
isSuccess-判断任务是否成功-
isCancellable-是否可以取消执行
cause-获取失败信息,非阻塞,如果没有失败,返回null-
addLinstener-添加回调,异步接收结果-
removeListener-删除回调,异步接收结果
setSuccess--设置成功结果
setFailure--设置失败结果

一、JDK原生 Future

关于 java.util.concurrent包下的Future 接口,我想大家应该都很熟悉,用得最多的就是在使用 Java 的线程池 ThreadPoolExecutor 的时候了。在 submit 一个任务到线程池中的时候,返回的就是一个 Future 实例,通过它来获取提交的任务的执行状态和最终的执行结果,我们最常用它的 isDone()get() 方法。

// 尝试取消执行此任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否在正常执行完成之前取消
boolean isCancelled();
// 任务是否完成,完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法都将返回true
boolean isDone();
// 阻塞获取执行结果
V get() throws InterruptedException, ExecutionException;
// 阻塞获取执行结果,指定超时时间
V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

接下来,演示一下使用jdk原生Future获取执行结果~

@Slf4j
public class JdkFutureTest01 {
    public static void main(String[] args) {
        // 线程池
        ExecutorService service = newFixedThreadPool(2);
        // 提交任务
        Future<Object> future = service.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                log.info("执行计算");
                Thread.sleep(1000);
                return 50;
            }
        });
        try {
            System.out.println(future.get());
            service.shutdownNow();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }
}

二、Netty包下的 Future

原生的Future功能比较有限,Netty扩展了Future并增加了以下方法:

// 判断任务是否成功
boolean isSuccess();
// 判断是否可以取消执行
boolean isCancellable();
// 获取失败的信息
Throwable cause();
// 添加回调方法,异步接收结果
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
// 添加多个回调方法
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 删除回调方法,异步接收结果
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// 删除多个回调方法
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 等待任务结束,如果任务失败,抛出异常
Future<V> sync() throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
Future<V> syncUninterruptibly();
// 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断
Future<V> await() throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
Future<V> awaitUninterruptibly();
// 等待该future在指定的时间限制内完成。
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
// 等待该future在指定的时间限制内完成。
boolean await(long timeoutMillis) throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
// 同上,区别是不可中断阻塞等待过程
boolean awaitUninterruptibly(long timeoutMillis);
// 获取任务结果,非阻塞,还未产生结果时返回 null
V getNow();

通过以上扩展的方法我们可以发现,Netty的Future增加了 sync()await() 方法用于阻塞等待,还提供了 addListener() 方法用于添加回调方法,异步接收结果。

sync() 方法内部会先调用 await() 方法,等待 await() 方法返回后,会检查该任务是否失败,如果失败则将失败的异常抛出来。即使用await()方法等待任务结束,如果任务失败,不会抛异常,而是需要通过 isSuccess 判断。然而 sync() 方法是直接抛出异常!

@Override
public Promise<V> sync() throws InterruptedException {
    await();
    rethrowIfFailed();
    return this;
}
private void rethrowIfFailed() {
  Throwable cause = cause();
  if (cause == null) {
    return;
  }

  PlatformDependent.throwException(cause);
}

接下来,演示一下使用Netty包下的Future获取执行结果~

@Slf4j
public class NettyFutureTest01 {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        EventLoop eventLoop = eventLoopGroup.next();
        Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.info("执行计算");
                Thread.sleep(1000);
                return 66;
            }
        });
      	// 阻塞等待
        future.sync();
        log.info("收到结果{}", future.getNow());
        eventLoopGroup.shutdownGracefully();
    }
}

又或者使用 addListener() 方法用于添加回调方法,异步接收结果。

future.addListener(new GenericFutureListener<Future<? super Integer>>() {
    @Override
    public void operationComplete(Future<? super Integer> future) throws Exception {
        log.info("收到结果{}", future.getNow());
        eventLoopGroup.shutdownGracefully();
    }
});

三、Promise

Future支持阻塞等待、添加回调方法、判断执行状态等,而Promise主要是支持状态设置相关方法。当底层I/O操作通过Promise改变执行状态,我们可以通过同步等待的Future立即得到结果。

// 设置成功结果并回调
Promise<V> setSuccess(V result);
// 同上,区别是是否报错
boolean trySuccess(V result);
// 设置失败异常并回调
Promise<V> setFailure(Throwable cause);
// 同上,区别是是否报错
boolean tryFailure(Throwable cause);
// 设置为不可取消状态
boolean setUncancellable();

可见,Promise作为一个特殊的Future,只是增加了一些状态设置方法。所以它常用于传入I/O业务代码中,用于I/O结束后设置成功(或失败)状态,并回调方法。以下是DefaultPromise的继承关系:
在这里插入图片描述

设置promise的状态其实就是原子地修改result字段为传入的执行结果。值得注意的是,result字段带有volatile关键字来确保多线程之间的可见性。另外,设置完毕状态后,会尝试唤醒所有在阻塞等待该promise返回结果的线程。

// result 字段的原子更新器
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
  AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
// 缓存执行结果的字段
private volatile Object result;
// Promise所在的线程
private final EventExecutor executor;
// 一个或多个回调方法
private Object listeners;
// 阻塞线程数量计数器
private short waiters;

@Override
public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
	return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
  // 原子修改result字段为 objResult
  if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
      RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
    if (checkNotifyWaiters()) {
      notifyListeners();
    }
    return true;
  }
  return false;
}
private synchronized boolean checkNotifyWaiters() {
  if (waiters > 0) {
    // 唤醒其他等待线程
    notifyAll();
  }
  return listeners != null;
}

1、使用Promise同步获取结果

@Slf4j
public class PromiseDemo01 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        DefaultEventLoop eventLoop = new DefaultEventLoop();
        Promise<Integer> promise = new DefaultPromise<>(eventLoop);

        eventLoop.execute(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("set success");
            promise.setSuccess(10);
        });

        log.info("start...");
        log.info("promise.getNow():{}" , promise.getNow());
        log.info("promise.get():{}" , promise.get());
    }
}

2、使用Promise异步获取结果

@Slf4j
public class PromiseDemo03 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        DefaultEventLoop eventLoop = new DefaultEventLoop();
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

        eventLoop.execute(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            RuntimeException exception = new RuntimeException("error....hh");
            log.debug("set failure,e: {}", exception.getMessage());
            promise.setFailure(exception);
        });

        log.info("start");
        log.info("promise.getNow():{}" , promise.getNow());
        log.info("promise.get():{}" , promise.get());
    }
}

.3、使用Promise同步获取异常 - sync & get

Slf4j
public class PromiseDemo03 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        DefaultEventLoop eventLoop = new DefaultEventLoop();
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

        eventLoop.execute(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            RuntimeException exception = new RuntimeException("error....hh");
            log.debug("set failure,e: {}", exception.getMessage());
            promise.setFailure(exception);
        });

        log.info("start");
        log.info("promise.getNow():{}" , promise.getNow());
        log.info("promise.get():{}" , promise.get());
    }
}

4、使用Promise同步获取异常 - await

@Slf4j
public class PromiseDemo04 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        DefaultEventLoop eventLoop = new DefaultEventLoop();
        Promise<Integer> promise = new DefaultPromise<>(eventLoop);

        eventLoop.execute(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            RuntimeException exception = new RuntimeException("error....hh");
            log.info("set failure,e: {}", exception.getMessage());
            promise.setFailure(exception);
        });

        log.info("start");
        log.info("promise.getNow():{}" , promise.getNow());
        promise.await();
        if (promise.isSuccess()) {
            log.info("{}", promise.getNow());
        } else {
            log.error("{}", promise.cause().toString());
        }
    }
}

5、使用Promise异步获取异常

@Slf4j
public class PromiseDemo05 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        DefaultEventLoop eventLoop = new DefaultEventLoop();
        Promise<Integer> promise = new DefaultPromise<>(eventLoop);
        promise.addListener(future -> {
            if (promise.isSuccess()) {
                log.info("{}", promise.getNow());
            } else {
                log.error("{}", promise.cause().toString());
            }
        });

        eventLoop.execute(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            RuntimeException exception = new RuntimeException("error....hh");
            log.info("set failure,e: {}", exception.getMessage());
            promise.setFailure(exception);
        });

        log.info("start");
        log.info("promise.getNow():{}" , promise.getNow());

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/975123.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

uniapp - 倒计时组件-优化循环时间倒计时

使用定时器的规避方法 为了避免定时器误差导致倒计时计算错误&#xff0c;可以采用一些规避方法&#xff0c;比如将倒计时被中断时的剩余时间记录下来&#xff0c;重新开启定时器时再将这个剩余时间加到新的计算中。同时&#xff0c;为了避免定时器延迟&#xff0c;可以在每次执…

Golang 新手经常踩的坑

1、 Golang 新手经常踩的坑 1.1 前言 Go 是一门简单有趣的编程语言&#xff0c;与其他语言一样&#xff0c;在使用时不免会遇到很多坑&#xff0c;不过它们大多不是 Go 本身的设计缺陷。如果你刚从其他语言转到 Go&#xff0c;那这篇文章里的坑多半会踩到。 如果花时间学习官…

风向变了!智能汽车何以「降本」

随着软件定义汽车的概念逐步落地&#xff0c;以及底盘、动力、座舱、智驾、车身等不同域&#xff08;分布式或者混合式&#xff09;的功能更新迭代和融合&#xff0c;汽车行业正在意识到&#xff1a;底层硬件架构重构的迫切性。 事实上&#xff0c;早在2016年&#xff0c;作为传…

客户端发现pod并与之通信

客户端发现pod并与之通信 pod需要一种寻找其他pod的方法来使用其他pod提供的服务&#xff0c;不像在没有Kubernetes的世界&#xff0c;系统管理员要在用户端配置文件中明确指出服务的精确IP地址 或者主机名来配置每个客户端应用&#xff0c;但同样的方法在Kubernetes中不适用 …

富硒虫草肉丸系列新品上市—虫草可以“享”着吃

2023年9月4日&#xff0c;鸿祥食品(汕尾市)有限公司探索研发的富硒虫草肉丸全系列新品惊喜亮相&#xff0c;为消费者带来全新的滋补体验。五款以富硒虫草为主要原料&#xff0c;分别以猪肉、鲜虾、牛肉、墨鱼以及牛筋为新食品原料加工而成的虫草类健康新品--“享着丸”系列&…

yolov5运行过程遇到的小问题(随时更新)

1.关于git的问题 解决办法&#xff1a;插入下面代码 import os os.environ["GIT_PYTHON_REFRESH"] "quiet"2.页面太小无法完成操作 解决办法: 如果不好使再考虑降低Batch_Size大小或者调整虚拟内存可用硬盘空间大小&#xff01;&#xff08;调整虚拟内存…

公信力不是儿戏:政府与非营利组织如何利用爱校对提升信息质量

公信力是政府和非营利组织成功的基础&#xff0c;而这种公信力大多来源于对外发布的信息的准确性和可靠性。在这个方面&#xff0c;“爱校对”展现了它的强大能力和实用性。以下我们将具体探讨这两种组织如何通过使用爱校对来提升他们的信息质量。 政府&#xff1a;公开与透明&…

【strapi系列】strapi在登录时调用api/auth/local获取token接口一直报401、403、400错误的问题解决

文章目录 问题描述解决403 forbidden问题解决401 (Unauthorized) error问题调用认证接口需用注意的事项&#xff0c;解决400问题 问题描述 strapi在调用api/auth/local登录接口时&#xff0c;一直报403 forbidden 或 401 (Unauthorized) error问题。 这个接口的作用其实就是使…

02-Linux-IO多路复用之select、poll和epoll详解

前言&#xff1a; 在linux系统中&#xff0c;实际上所有的 I/O 设备都被抽象为了文件这个概念&#xff0c;一切皆文件&#xff0c;磁盘、网络数据、终端&#xff0c;甚至进程间通信工具管道 pipe 等都被当做文件对待。 在了解多路复用 select、poll、epoll 实现之前&#xff…

手写Mybatis:第19章-二级缓存

文章目录 一、目标&#xff1a;二级缓存二、设计&#xff1a;二级缓存三、实现&#xff1a;二级缓存3.1 工程结构3.2 二级缓存类图3.3 二级缓存队列3.3.1 FIFI缓存策略3.3.2 事务缓存3.3.3 事务管理3.3.4 修改一级缓存 3.4 缓存执行器3.4.1 执行器接口3.4.2 执行器抽象基类3.4.…

STM32CUBEMX_创建时间片轮询架构的软件框架

STM32CUBEMX_创建时间片轮询架构的软件框架 说明&#xff1a; 1、这种架构避免在更新STM32CUBEMX配置后把用户代码清除掉 2、利用这种时间片的架构可以使得代码架构清晰易于维护 创建步骤&#xff1a; 1、使用STM32CUBEMX创建基础工程 2、新建用户代码目录 3、构建基础的代码框…

uniapp制作——交友盲盒

在小程序端可以有很多好玩的小玩意&#xff0c;目前网上比较有趣的就是有一个交友盲盒&#xff0c;能抽出和找出对象的一个有趣的小程序&#xff0c;所以今天给大家带来用uniapp搭建的交友盲盒&#xff0c;大家再根据自己的情况去搭建自己的后端和数据库来完成自己的一个小项目…

实现Android APK瘦身99.99%

摘要&#xff1a; 如何瘦身是 APK 的重要优化技术。APK 在安装和更新时都需要经过网络下载到设备&#xff0c;APK 越小&#xff0c;用户体验越好。本文作者通过对 APK 内在机制的详细解析&#xff0c;给出了对 APK 各组成成分的优化方法及技术&#xff0c;并实现了一个基本 APK…

制药行业GMP是什么?

药品制造是一项极其关键的行业&#xff0c;它直接涉及到人民的健康和生命安全。因此&#xff0c;确保药品质量、安全性和有效性至关重要。为了实现这一目标&#xff0c;全球范围内都实施了药品生产质量管理规范&#xff0c;通常被简称为GMP&#xff08;Good Manufacturing Prac…

Java8新特性stream和parallelStream有什么区别

1 stream和parallelStream的区别 1.Stream 是在 Java8 新增的特性&#xff0c;普遍称其为流&#xff1b;它不是数据结构也不存放任何数据&#xff0c;其主要用于集合的逻辑处理。 2.Stream流是一个集合元素的函数模型&#xff0c;它并不是集合&#xff0c;也不是数据结构&…

是真卷?还是内耗?这次面试造的火箭着实有点离谱

大家好&#xff0c;我是冰河~~ 都说面试造火箭&#xff0c;工作拧螺丝&#xff0c;不过这次面试造的着实有点离谱&#xff01; 事情是这样的&#xff0c;昨天在冰河的知识星球微信群里&#xff0c;一名小伙伴发了一张面试常州一家公司的面试题的图片&#xff0c;瞬间吸引了我…

十个响应式页面项目

十个响应式页面项目 上接 53 个特效&#xff1a; 53 个 CSS 特效 153 个 CSS 特效 253 个 CSS 特效 3&#xff08;完&#xff09; 照例&#xff0c;预览地址在 http://www.goldenaarcher.com/html-css-js-proj/&#xff0c;git 地址&#xff1a; https://github.com/Goldena…

Seata处理分布式事务之1.7.0

https://blog.csdn.net/zhang33565417/article/details/122768300 1.5.0之后版本发生了很大改变 1.seata安装 1.1官网地址 http://seata.io/zh-cn/ 1.2下载地址 https://github.com/seata/seata/releases 下载的是seata-server-1.7.0.zip 1.3seata相关配置的修改 seata-…

Python学习 -- Math模块和Random模块

math 模块提供了许多数学函数&#xff0c;用于执行各种数学运算。以下是一些常用的 math 函数以及相应的示例代码&#xff1a; math.sqrt(x): 计算平方根。 import math x 25 square_root math.sqrt(x) print(f"√{x} {square_root}") math.pow(x, y): 计算 x …

好用的电容笔有哪些推荐?适合开学买电容笔推荐

尤其是在苹果品牌推出了专属Pencil系列之后&#xff0c;苹果原装的Pencil系列产品&#xff0c;更是将价格压到了所有人都无法企及的地步。市场上有不少可以替代苹果Pencil的平替电容笔&#xff0c;无论是做笔记还是专业的绘画&#xff0c;都完全足够了。在此&#xff0c;我将为…