Vert.x 源码解析(4.x)(一)——Future源码解析

news2024/11/17 13:58:26

目录

在这里插入图片描述

1. 简介

在现代的软件开发中,异步编程已经变得非常重要。它可以提高应用程序的并发性能,使应用程序能够更有效地处理大量的并行操作。Vert.x 是一个面向事件驱动、非阻塞的异步编程框架,它提供了丰富的工具来简化异步编程的复杂性。

如下图就是Vert.x实现异步设计到的类,主要关键在于FutureImpl以及PromiseImpl。下面会介绍他们分别负责什么。
在这里插入图片描述

2.关键类简介

2.1 AsyncResult

这是 Vert.x 中的一个通用接口,用于表示异步操作的结果。它可以包含成功的结果值或失败的异常,允许您在异步操作完成后检查结果,并相应地采取行动。

关键函数如下

public interface AsyncResult<T> {

  //执行完后的值
  T result();

  //异常值
  Throwable cause();
  //判断是否成功
  boolean succeeded();
  //判断是否失败
  boolean failed();
    
  .........
  
}

2.2 Future

它扩展了AsyncResult接口,并且内部增加了很多组合操作符,比如join,any,all,map等

关键函数如下

public interface Future<T> extends AsyncResult<T> {
  static <T> CompositeFuture all(List<? extends Future<?>> futures) {
    return CompositeFutureImpl.all(futures.toArray(new Future[0]));
  }
  static CompositeFuture any(List<? extends Future<?>> futures) {
    return CompositeFutureImpl.any(futures.toArray(new Future[0]));
  }
  static CompositeFuture join(List<? extends Future<?>> futures) {
    return CompositeFutureImpl.join(futures.toArray(new Future[0]));
  }
  default <U> Future<U> flatMap(Function<T, Future<U>> mapper) {
    return compose(mapper);
  }
  //是否完成
  boolean isComplete();
  //完成后的回调监听
  Future<T> onComplete(Handler<AsyncResult<T>> handler);
  //成功监听
  default Future<T> onSuccess(Handler<T> handler) {
    return onComplete(ar -> {
      if (ar.succeeded()) {
        handler.handle(ar.result());
      }
    });
  }
  //失败监听
  default Future<T> onFailure(Handler<Throwable> handler) {
    return onComplete(ar -> {
      if (ar.failed()) {
        handler.handle(ar.cause());
      }
    });
  }
}

2.3 FutureInternal

这个接口主要定义了添加监听器的方法,后续所以监听完成的监听器都是调用这个方法,但是这个是内部调用的。

//上下文,Vert.x线程等都是由这个来执行的,具体后续会出context文章
ContextInternal context();
//添加监听器
void addListener(Listener<T> listener);

2.4 FutureImpl、FutureBase

FutureBase这里就emitSuccess和emitFailure方法主要是是执行listener的方法以及一些转换函数添加监听器的方法,这边就不单独列出了

FutureImpl

这里就主要介绍两个方法

onComplete以及tryComplete

onComplete:传入一个handler,后续任务完成后会调用handler

tryComplete:当你要操作的异步的有结果后会调用tryComplete,接着就会调用OnComplete传入的handler方法(其实实际上暴露出来给我们使用者调用的是PromiseImpl的complete方法,接着再调用tryComplete,这个后面会讲

 //这是结果值
 private Object value;
/**
 * 尝试完成,就是把结果传入进来,接着调用监听器,告知函所有监听者完成信号
 * @param result
 * @return
 */
public boolean tryComplete(T result) {
  Listener<T> l;
  synchronized (this) {
    //如果value有值了直接返回,主要是把传进来的result赋值。
    if (value != null) {
      return false;
    }
    
    value = result == null ? NULL_VALUE : result;
    l = listener;
    listener = null;
  }
  if (l != null) {
    //这就是我们刚刚说的FutureBase定义的接口,里面就是调用监听器的方法
    emitSuccess(result, l);
  }
  return true;
}

/**
   * 添加完成回调的方法
   * @param handler the handler that will be called with the result
   * @return
   */
  @Override
  public Future<T> onComplete(Handler<AsyncResult<T>> handler) {
    Objects.requireNonNull(handler, "No null handler accepted");
    Listener<T> listener;
    //判断是否属于Listener,如果不是则把handler包裹进来,通过listener成功和失败的回调进行执行handler
    if (handler instanceof Listener) {
      listener = (Listener<T>) handler;
    } else {
      listener = new Listener<T>() {
        @Override
        public void onSuccess(T value) {
          try {
              //成功后调用handle
            handler.handle(FutureImpl.this);
          } catch (Throwable t) {
            if (context != null) {
              context.reportException(t);
            } else {
              throw t;
            }
          }
        }
        @Override
        public void onFailure(Throwable failure) {
          try {
              //失败后调用
            handler.handle(FutureImpl.this);
          } catch (Throwable t) {
            if (context != null) {
              context.reportException(t);
            } else {
              throw t;
            }
          }
        }
      };
    }
    //就是调用FutureInternal的添加监听器的方法。
    addListener(listener);
    return this;
  }

2.5 Promise、PromiseInternal、PromiseImpl

Promise

public interface Promise<T> extends Handler<AsyncResult<T>> {
  //提供直接返回实现类实例
  static <T> Promise<T> promise() {
    return new PromiseImpl<>();
  }
  //这里页定义了tryComplete
  boolean tryComplete(T result);   
  //看到了吧,这个我们使用者其实是调用这个complete方法,它内部会调用tryComplete方法来
  default void complete(T result) {
    if (!tryComplete(result)) {
      throw new IllegalStateException("Result is already complete");
    }
  }  
}

PromiseInternal

内部增加了一个context获取方法

ContextInternal context();

PromiseImpl

看以下代码PromiseImpl继承了FutureImpl

public final class PromiseImpl<T> extends FutureImpl<T> implements PromiseInternal<T>, Listener<T> {
  @Override
  public Future<T> future() {
    return this;
  }
    
}

2.6 疑问

是不是疑问来了

  1. 为什么还需要Promise来调用complete,不直接用FutureImpl呢?

职责分明问题FutureImpl是结果容器,代表未来会生成结果的容器,接着通过监听器来告知你这个结果。

PromiseImpl 是可以允许你手动设置异步操作结果所以Future像Get,Promise是Set

2.7 其他

CompositeFutureImpl:继承FutureImpl,用于实现 CompositeFuture 接口的默认实现。CompositeFuture 用于组合多个异步操作,等待它们全部完成或任意一个完成(Future做组合变换等操作都是继承FutureImpl类实现,包括PromiseImpl,Otherwise,Mapping等

3. 入门实例

3.1 案例1(独立使用Future)

    //创建Promise
    Promise promise=new PromiseImpl();
    //根据Promise获取Future
    Future future=promise.future();

    //模拟异步任务
    new Thread(() -> {
      try {
        Thread.sleep(5000);
        //执行完后调用complete方法,并传入结果
        promise.complete(123);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }).start();
    //回调
    future.onComplete(event -> {
      System.out.println(event+"成功了");
    });
    System.out.println("代码执行完成");

    Thread.sleep(15000);

结果:

代码执行完成
Future{result=123}成功了

3.2 案例2(Vert.x内部封装的文件调用)

这是Vert.x内部打开文件的方法,内部也是使用了Future和Promise。后面会分析源代码如何实现

FileSystem fs = vertx.fileSystem();

Future<FileProps> future = fs.props("/my_file.txt");

future.onComplete((AsyncResult<FileProps> ar) -> {
  if (ar.succeeded()) {
    FileProps props = ar.result();
    System.out.println("File size = " + props.size());
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

4.源码分析

根据入门实例,进行源码分析

前面是直接创建PromiseImpl,接着调用promise.future直接获取Future类了。

我们重点分析的就是OnComplete类和complete类。

//创建Promise
Promise promise=new PromiseImpl();
//根据Promise获取Future
Future future=promise.future();

//模拟异步任务
new Thread(() -> {
  try {
    Thread.sleep(5000);
    //执行完后调用complete方法
    promise.complete(123);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}).start();
//回调
future.onComplete(event -> {
  System.out.println(event+"成功了");
});
System.out.println("代码执行完成");

4.1 OnComplete

public Future<T> onComplete(Handler<AsyncResult<T>> handler) {
  Objects.requireNonNull(handler, "No null handler accepted");
  Listener<T> listener;
  //判断是否属于Listener,如果不是则把handler包裹进来,通过listener成功和失败的回调进行执行handler
  if (handler instanceof Listener) {
    listener = (Listener<T>) handler;
  } else {
    listener = new Listener<T>() {
      @Override
      public void onSuccess(T value) {
        try {
          handler.handle(FutureImpl.this);
        } catch (Throwable t) {
          if (context != null) {
            context.reportException(t);
          } else {
            throw t;
          }
        }
      }
      @Override
      public void onFailure(Throwable failure) {
        try {
          handler.handle(FutureImpl.this);
        } catch (Throwable t) {
          if (context != null) {
            context.reportException(t);
          } else {
            throw t;
          }
        }
      }
    };
  }
  //就是调用FutureInternal的添加监听器的方法。
  addListener(listener);
  return this;
}

addListener

@Override
public void addListener(Listener<T> listener) {
  Object v;
  synchronized (this) {
    //将value赋值,这就是判断当前Future是否已存在结果值
    v = value;
    //如果等于null说明现在还没有结果值
    //因为可能添加监听器的时候已经有值了,那么直接调用监听器方法
    if (v == null) {
      //如果等于空就赋值
      if (this.listener == null) {
        this.listener = listener;
      } else {
        ListenerArray<T> listeners;
        //根据类型是单一还是列表要么转换,要么new再添加
        if (this.listener instanceof FutureImpl.ListenerArray) {
          listeners = (ListenerArray<T>) this.listener;
        } else {
          listeners = new ListenerArray<>();
          listeners.add(this.listener);
          this.listener = listeners;
        }
        listeners.add(listener);
      }
      return;
    }
  }
  if (v instanceof CauseHolder) {
    emitFailure(((CauseHolder)v).cause, listener);
  } else {
    if (v == NULL_VALUE) {
      v = null;
    }
    emitSuccess((T) v, listener);
  }
}

emitSuccess 这个方法再complete会说

4.2 complete

promise.complete(123);

complete里面默认就是调用tryComplete并传入值

default void complete(T result) {
  if (!tryComplete(result)) {
    throw new IllegalStateException("Result is already complete");
  }
}
/**
 * 尝试完成,就是把结果传入进来,接着调用监听器,告知函所有监听者完成信号
 * @param result
 * @return
 */
public boolean tryComplete(T result) {
  Listener<T> l;
  synchronized (this) {
    //如果value有值了直接返回,主要是把传进来的result赋值。
    if (value != null) {
      return false;
    }

    value = result == null ? NULL_VALUE : result;
    l = listener;
    listener = null;
  }
  if (l != null) {
    //这就是我们刚刚说的FutureBase定义的接口,里面就是调用监听器的方法
    emitSuccess(result, l);
  }
  return true;
}

emitSuccess

protected final void emitSuccess(T value, Listener<T> listener) {
  //context不等于空,则再context里面的线程进行执行,调用listener.onSuccess
  if (context != null && !context.isRunningOnContext()) {
    context.execute(() -> {
      ContextInternal prev = context.beginDispatch();
      try {
        listener.onSuccess(value);
      } finally {
        context.endDispatch(prev);
      }
    });
  } else {
    //这边直接执行
    listener.onSuccess(value);
  }
}

5 总结

创建PromiseImpl,并且获取Future类,通过Future.OnComplete来添加监听器,通过Promise的complete设置值并且通知监听器。是不是很简单。

后续我看情况再写一篇关于针对于Future的其他实现类,来解释all,any,map等原理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/957576.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

误删文件恢复怎么做?2023最新方法公布!

“突然感觉闯了大祸&#xff0c;在用朋友的电脑时&#xff0c;误删了一些他电脑里非常重要的文件&#xff0c;现在真的感觉很对不起&#xff0c;有什么方法可以找回这些误删的文件吗&#xff1f;非常着急&#xff0c;希望大家给我一些建议&#xff01;” 如今&#xff0c;电脑已…

Go:关于‘fresh‘ 不是内部或外部命令,也不是可运行的程序问题的解决方案

如果你使用了go get命令来安装fresh包&#xff0c;那么fresh命令可能没有被正确添加到系统的PATH环境变量中&#xff0c;需要修改你的fresh.exe的文件存放位置。 一般而言&#xff0c;你会将GO的安装文件夹Go与工作区文件夹GoProjects分开&#xff08;你的文件夹名称与我的不同…

linux刻录iso到u盘

需要的工具&#xff1a;Linux系统、U盘、ISO镜像文件。 首先在Linux系统中打开终端&#xff0c;使用dd命令&#xff0c;格式如下&#xff1a; sudo dd ifxxx.iso of/dev/sdb 命令中xxx.iso是你的ISO镜像文件的路径&#xff0c;of后面的你的U盘路径&#xff0c;一般就是/dev/sdb…

软件架构设计(二) 软件架构风格其他风格简介

架构师备战(四)-软件架构设计(二) 软件架构风格其他风格简介 架构风格其实是很重要的知识,我们先了解了基本的五种架构风格, 我们之前也提到除了这五种风格之外, 还有一些没有收录在这几种风格之内的, 这次会去做一个探索。 1、闭环控制架构风格(过程控制)【重要】 概念 …

ChatGPT Prompting开发实战(四)

一、chaining prompts应用解析及输出文本的设定 由于输入和输出都是字符串形式的自然语言&#xff0c;为了方便输入和输出信息与系统设定使用的JSON格式之间进行转换&#xff0c;接下来定义从输入字符串转为JSON list的方法&#xff1a; 定义从JSON list转为输出字符串的方法&…

基于北方苍鹰算法优化的BP神经网络(预测应用) - 附代码

基于北方苍鹰算法优化的BP神经网络&#xff08;预测应用&#xff09; - 附代码 文章目录 基于北方苍鹰算法优化的BP神经网络&#xff08;预测应用&#xff09; - 附代码1.数据介绍2.北方苍鹰优化BP神经网络2.1 BP神经网络参数设置2.2 北方苍鹰算法应用 4.测试结果&#xff1a;5…

C#安装“Windows 窗体应用(.NET Framework)”

目录 背景: 第一步: 第二步: 第三步&#xff1a; 总结: 背景: 如下图所示:在Visual Studio Installer创建新项目的时候&#xff0c;想要添加windows窗体应用程序&#xff0c;发现里面并没有找到Windows窗体应用(.NET Framework)模板&#xff0c;快捷搜索也没有发现&#…

插入排序(Insertion Sort)

C自学精简教程 目录(必读) 插入排序 每次选择未排序子数组中的第一个元素&#xff0c;从后往前&#xff0c;插入放到已排序子数组中&#xff0c;保持子数组有序。 打扑克牌&#xff0c;起牌。 输入数据 42 20 17 13 28 14 23 15 执行过程 完整代码 #include <iostream…

springboot的mybatis问题

自动映射 在数据库列名和java类属性名相同的情况&#xff0c;mybatis会自动将数据库的值自动匹配到java类的属性当中。 java的price等变量 mysql的price等字段 mybatis会自动将数据库的值自动匹配到java类的属性当中。 开启驼峰命名 在application中配置 mybatis:type-…

【ArcGIS Pro二次开发】(64):多分式标注

在ArcGIS中有时会遇到需要二分式标注的情况&#xff0c;有时甚至是三分式、四分式。 通过输入标注表达式&#xff0c;可以做出如下的效果&#xff0c;但是代码不短&#xff0c;每次都要输一遍也挺麻烦。 网上也有一些分式标注的python工具&#xff0c;但不够直观&#xff0c;于…

Flink 如何定位反压节点?

分析&回答 Flink Web UI 自带的反压监控 —— 直接方式 Flink Web UI 的反压监控提供了 Subtask 级别的反压监控。监控的原理是通过Thread.getStackTrace() 采集在 TaskManager 上正在运行的所有线程&#xff0c;收集在缓冲区请求中阻塞的线程数&#xff08;意味着下游阻…

CXL 存储设备标签存储区(LSA)

&#x1f525;点击查看精选 CXL 系列文章&#x1f525; &#x1f525;点击进入【芯片设计验证】社区&#xff0c;查看更多精彩内容&#x1f525; &#x1f4e2; 声明&#xff1a; &#x1f96d; 作者主页&#xff1a;【MangoPapa的CSDN主页】。⚠️ 本文首发于CSDN&#xff0c…

Maven基础的快速入门

导读 概念&#xff1a;Maven是apache旗下的一个开源项目&#xff0c;是一款用于管理和构建Java项目的工具 Maven的作用&#xff1a; 1.依赖管理&#xff1a;放便快捷的管理项目依赖的资源&#xff08;jar包&#xff09;&#xff0c;避免版本冲突的问题 2.统一项目结构&…

Flink中RPC实现原理简介

前提知识 Akka是一套可扩展、弹性和快速的系统&#xff0c;为此Flink基于Akka实现了一套内部的RPC通信框架&#xff1b;为此先对Akka进行了解 Akka Akka是使用Scala语言编写的库&#xff0c;基于Actor模型提供一个用于构建可扩展、弹性、快速响应的系统&#xff1b;并被应用…

Git使用——GitHub项目回退版本

查看历史版本 使用git log命令查看项目的历史版本&#xff1a; 可以一直回车&#xff0c;直到找到想要的历史版本&#xff0c;复制commit后面的那一串id。 恢复历史版本 执行命令 git reset --hard 版本号&#xff1a; git reset --hard 39ac3ea2448e81ea992b7c4fdad9252983…

防溺水方案:安防监控视频/智能分析网关AI识别技术助力防溺水监管

溺水是造成许多人死亡的主要原因之一。无论是在游泳池、河流、湖泊还是海洋中&#xff0c;溺水都可能导致人们失去生命。即使没有造成死亡&#xff0c;溺水所引发的窒息和水下活动中的创伤等伤害&#xff0c;有可能引起长期甚至永久性的身体损伤&#xff0c;对个人和家庭造成巨…

keras深度学习框架通过卷积神经网络cnn实现手写数字识别

昨天通过keras构建简单神经网络实现手写数字识别&#xff0c;结果在最后进行我们自己的手写数字识别的时候&#xff0c;准确率堪忧&#xff0c;只有60%。今天通过卷积神经网络来实现手写数字识别。 构建卷积神经网络和简单神经网络思路类似&#xff0c;只不过这里加入了卷积、池…

分布式 - 服务器Nginx:基础系列之Nginx配置文件结构

文章目录 1.Nginx 配置文件结构2. Nginx 全局块的指令01. user 指令02. master_process 指令03. worker_processes 指令04. deamon 指令05. pid 指令06. error_log 指令07. include 指令 3. Nginx events块的指令01. accept_mutex 指令02. multi_accept 指令03. worker_connect…

【阅读笔记】如何正确地学习编程?

2023年9月1日&#xff0c;周五上午 本次阅读的文章来自&#xff1a; 为什么我学个 JAVA 就已经耗尽所有&#xff0c;而有些人还能同时学习多门语言&#xff1f; - invalid s的回答 - 知乎 https://www.zhihu.com/question/485917018/answer/2216877333 令我感到有趣的是&#…

Flink的checkpoint是怎么实现的?

分析&回答 Checkpoint介绍 Checkpoint容错机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性。Flink的Checkpoint机制原理来自“Chandy-Lamport alg…