ExecutorService、Callable、Future实现有返回结果的多线程原理解析

news2024/9/28 13:34:27

在并发多线程场景下,存在需要获取各线程的异步执行结果,这时,就可以通过ExecutorService线程池结合Callable、Future来实现。

我们先来写一个简单的例子——

public class ExecutorTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Callable callable = new MyCallable();
        Future future = executor.submit(callable);
        System.out.println("打印线程池返回值:" + future.get());
    }
}

class MyCallable implements Callable<String>{
    @Override
    public String call() throws Exception {
        return "测试返回值";
    }
}

执行完成后,会打印出以下结果:

打印线程池返回值:测试返回值

可见,线程池执行完异步线程任务,我们是可以获取到异步线程里的返回值。

那么,ExecutorService、Callable、Future实现有返回结果的多线程是如何实现的呢?

首先,我们需要创建一个实现函数式接口Callable的类,该Callable接口只定义了一个被泛型修饰的call方法,这意味着,需要返回什么类型的值可以由具体实现类来定义——

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

因此,我自定义了一个实现Callable接口的类,该类的重写了call方法,我们在执行多线程时希望返回什么样的结果,就可以在该重写的call方法定义。

class MyCallable implements Callable<String>{
    @Override
    public String call() throws Exception {
        return "测试返回值";
    }
}

在自定义的MyCallable类中,我在call方法里设置一个很简单的String返回值 “测试返回值”,这意味着,我是希望在线程池执行完异步线程任务时,可以返回“测试返回值”这个字符串给我。

接下来,我们就可以创建该MyCallable类的对象,然后通过executor.submit(callable)丢到线程池里,线程池里会利用空闲线程来帮我们执行一个异步线程任务。

ExecutorService executor = Executors.newSingleThreadExecutor();
Callable callable = new MyCallable();
Future future = executor.submit(callable);

值得注意一点是,若需要实现获取线程返回值的效果,只能通过executor.submit(callable)去执行,而不能通过executor.execute(Runnable command)执行,因为executor.execute(Runnable command)只能传入实现Runnable 接口的对象,但这类对象是不具备返回线程效果的功能。

进入到executor.submit(callable)底层,具体实现在AbstractExecutorService类中。可以看到,执行到submit方法内部时,会将我们传进来的new MyCallable()对象作为参数传入到newTaskFor(task)方法里——

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

这个newTaskFor(task)方法内部具体实现,是将new MyCallable()对象传入构造器中,生成了一个FutureTask对象。

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

这个FutureTask对象实现RunableFuture接口,这个RunableFuture接口又继承了Runnable,说明FutureTask类内部会实现一个run方法,然后本身就可以当做一个Runnable线程任务,借助线程Thread(new FutureTask(.....)).start()方式开启一个新线程,去异步执行其内部实现的run方法逻辑。

public class FutureTask<V> implements RunnableFuture<V>{.....}

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

分析到这里,可以知道FutureTask的核心方法一定是run方法,线程执行start方法后,最后会去调用FutureTask的run方法。在讲解这个run方法前,我们先去看一下创建FutureTask的初始化构造方法底层逻辑new FutureTask(callable) ——

public class FutureTask<V> implements RunnableFuture<V> {

private Callable<V> callable;

......//省略其余源码

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    //通过构造方法初始化Callable<V> callable赋值
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

......//省略其余源码
}

可以看到,FutureTask(Callable callable)构造器,主要是将我们先前创建的new MyCallable()对象传进来,赋值给FutureTask内部定义的Callable callable引用,实现子类对象指向父类引用。这一点很关键,这就意味着,在初始化创建FutureTask对象后,我们是可以通过callable.call()来调用我们自定义设置可以返回“测试返回值”的call方法,这不就是我们希望在异步线程执行完后能够返回的值吗?

我们不妨猜测一下整体返数主流程,在Thread(new FutureTask(.....)).start()开启一个线程后,当线程获得了CPU时间片,就会去执行FutureTask对象里的run方法,这时run方法里可以通过callable.call()调用到我们自定义的MyCallable#call()方法,进而得到方法返回值 “测试返回值”——到这一步,只需要将这个返回值赋值给FutureTask里某个定义的对象属性,那么,在主线程在通过获取FutureTask里被赋值的X对象属性值,不就可以拿到返回字符串值 “测试返回值”了吗?

实现上,主体流程确实是这样,只不过忽略了一些细节而已。

接下来,让我们看一下FutureTask的run方法——

public void run() {
    //如果状态不是NEW或者设置runner为当前线程时,说明FutureTask任务已经取消,无法继续执行
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        //在该文中,callable被赋值为指向我们定义的new MyCallable()对象引用
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //c.call最后会调用new MyCallable()的call()方法,得到字符串返回值“测试返回值”给result
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            //正常执行完c.call()方法时,ran值为true,说明会执行set(result)方法。
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

根据以上源码简单分析,可以看到run方法当中,最终确实会执行new MyCallable()的call()方法,得到字符串返回值“测试返回值”给result,然后执行set(result)方法,根据set方法名就不难猜出,这是一个会赋值给某个字段的方法。

这里分析会忽略一些状态值的讲解,这块会包括线程的取消、终止等内容,后面我会出一片专门针对FutureTask源码分析的文章再介绍,本文主要还是介绍异步线程返回结果的主要原理。

沿着以上分析,追踪至set(result)方法里——

protected void set(V v) {
    //通过CAS原子操作,将运行的线程设置为COMPLETING,说明线程已经执行完成中
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        //若CAS原子比较赋值成功,说明线程可以被正常执行完成的话,然后将result结果值赋值给outcome
        outcome = v;
        //线程正常完成结束
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

这个方法的主要是,若该线程执行能够正常完成话,就将得到的返回值赋值给outcome,这个outcome是FutureTask的一个Object变量——

private Object outcome;

至此,就完成了流程的这一步——

最后,就是执行主线程的根据ftask.get()获取执行完成的值,这个get可以设置超时时间,例如 ftask.get(2,TimeUnit.SECONDS)表示超过2秒还没有获取到线程返回值的话,就直接结束该get方法,继续主线程往下执行。

System.out.println("打印线程池返回值:" + ftask.get(2,TimeUnit.SECONDS));

进入到get方法,可以看到当状态在s <= COMPLETING时,表示任务还没有执行完,就会去执行awaitDone(false, 0L)方法,这个方法表示,将一直做死循环等待线程执行完成,才会跳出等待循环继续往下走。若设置了超时时间,例如ftask.get(2,TimeUnit.SECONDS)),就会在awaitDone方法循环至2秒,在2秒内发现线程状态被设置为正常完成时,就会跳出循环,若2秒后线程没有执行完成,也会强制跳出循环了,但这种情况将无法获取到线程结果值。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        //循环等待线程执行状态
        s = awaitDone(false, 0L);
    return report(s);
}

最后就是report(s)方法,可以看到outcome值最终赋值给Object x,若s==NORMAL表示线程任务已经正常完成结束,就可以根据我们定义的类型进行泛型转换返回,我们定义的是String字符串类型,故而会返回字符串值,也就是 “测试返回值”。

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        //返回线程任务执行结果
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

你看,最后就能获取到了异步线程执行的结果返回给main主线程——

以上就是执行线程任务run方法后,如何将线程任务结果返回给主线程,其实,还少一个地方补充,就是如何将FutureTask任务丢给线程执行,我们这里用到了线程池, 但是execute(ftask)底层同样是使用一个了线程通过执行start方法开启一个线程,这个新运行的线程最终会执行FutureTask的run方法。

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

可以简单优化下,直接用一个线程演示该案例,这样看着更好理解些,当时,生产上是不会有这样直接用一个线程来执行的,更多是通过原生线程池——

public static void main(String[] args) throws Exception{
    Callable callable = new MyCallable();
    RunnableFuture<String> ftask = new FutureTask<String>(callable);
    new Thread(ftask).start();
    System.out.println("打印线程池返回值:" + ftask.get());
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/339222.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

KMP 算法

1 应用场景-字符串匹配问题  字符串匹配问题&#xff1a;&#xff1a; 有一个字符串 str1 ““硅硅谷 尚硅谷你尚硅 尚硅谷你尚硅谷你尚硅你好””&#xff0c;和一个子串 str2“尚硅谷你尚硅 你” 2) 现在要判断 str1 是否含有 str2, 如果存在&#xff0c;就返回第一次出现…

数据与C(limits.h数据常数介绍)

本章简单的介绍一下limits.h的数据常量&#xff0c;这里简单了解一下就好了 目录 一.limits.h 二.float.h头文件 一.limits.h CHAR_BIT char类型的位数 CHARMAX char类型的最大值 CHAR_MIN char类型的最小值 SCHAR_MAX signed char类型的最大…

SpringBoot图片上传和访问路径映射

图片上传和静态资源映射编写controller层接口上传到文件夹相关配置1 application.properties配置文件&#xff1a;2 Constant类&#xff1a;文件的资源映射配置WebMvcConfigurer的继承类注意测试编写controller层接口 ApiOperation("图片上传功能")PostMapping(&quo…

Java笔记-volatile和AtomicInteger

目录1. volatile1.1.什么是volatile1.2.JMM-Java内存模型2 验证volatile的特性2.1 可见性2.2.验证volatile不保证原子性2.3 volatile实现禁止指令重排序3.使用AtomicInteger解决volatile的不能实现原子性的问题3.2 AtomicInteger的方法说明&#xff1a;3.3 CAS3.4 应用1. volat…

linux-进程1-进程概述

写在最前 记录一下linux的进程学习专题 1. 程序和进程的区别 1.1 程序 程序是包含一系列信息的文件&#xff0c;这些信息描述了如何在运行时创建一个进程&#xff1a; 二进制格式标识&#xff1a;每个程序文件都包含用于描述可执行文件格式的元信息。内核利用此信息来解 释文…

Redis实战-session共享之修改登录拦截器

在上一篇中Redis实战之session共享&#xff0c;我们知道了通过Redis实现session共享了&#xff0c;那么token怎么续命呢&#xff1f;怎么刷新用户呢&#xff1f;本来咱们就通过拦截器来实现这两个功能。 登录拦截器优化&#xff1a; 先来看看现在拦截器情况&#xff1a; 拦截…

JavaScipt基础学习(1)

1. JavaScript特点 JavaScript是脚本编写语言&#xff1b;所有主流浏览器都支持JavaScript&#xff1b;JavaScript基于对象语言&#xff1b;JavaScriptb变量类型是弱类型&#xff0c;没有如Java一样严格的数据类型&#xff1b;变量是弱类型的。因此定义变量时&#xff0c;只使…

WindowsServer服务器系列:部署FTP文件服务

1、点击“开始”菜单&#xff0c;选择“服务器管理器” 2、在接下来弹出页面中选择“添加角色和功能” 3、接下来点击“下一步” 4、接下来选择“基于角色或基于功能的安装”并点击“下一步” 5、选择“从服务器池中选择服务器”并点击“下一步” 6、接下来选中“Web 服务器(II…

【数模比赛】2023美国大学生数学建模比赛(思路、代码......)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

虚拟存储管理(6)

虚拟存储管理 前面介绍的存储管理方案要求作业全部装入内存才可运行。但这会出现两种情况&#xff1a; 有的作业因太大&#xff0c;内存装不下而无法运行。系统中作业数太多&#xff0c;因系统容量有限只能让少数作业先运行。 1 局部性原理 定义&#xff1a; 程序执行时&a…

TCP网络编程中connect()、listen()和accept()三者之间的关系

基于 TCP 的网络编程开发分为服务器端和客户端两部分&#xff0c;常见的核心步骤和流程如下&#xff1a; connect()函数 对于客户端的 connect() 函数&#xff0c;该函数的功能为客户端主动连接服务器&#xff0c;建立连接是通过三次握手&#xff0c;而这个连接的过程是由内核…

LeetCode题目笔记——24. 两两交换链表中的节点

文章目录题目描述题目链接题目难度——中等方法一&#xff1a;迭代代码/C代码/python方法二&#xff1a;递归代码/C总结题目描述 或许这也是个经典的面试题&#xff0c;记录一手 给你一个链表&#xff0c;两两交换其中相邻的节点&#xff0c;并返回交换后链表的头节点。你必须在…

模电学习6. 常用的三极管放大电路

模电学习6. 常用的三极管放大电路一、判断三极管的工作状态1. 正偏与反偏的概念2. 工作状态的简单判断二、三种重要的放大电路1. 共射电路2. 共集电极放大电路3. 共基极放大电路一、判断三极管的工作状态 1. 正偏与反偏的概念 晶体管分P区和N区&#xff0c; 当P区电压大于N区…

[设计模式] 建造者模式

文章目录什么是建造者模式建造者模式建造者模式中的角色UML类图代码实现建造者模式与工厂模式的区别什么是建造者模式 建造者模式(Builder Pattern)是一种创建型的设计模式&#xff0c;它将一个复杂对象的构建与它的表示分离&#xff0c;也就是复杂的构建隐藏起来&#xff0c;…

即时通讯系列-4-如何设计写扩散下的同步协议方案

1. 背景信息 上篇提到了, IM协议层是主要解决会话和消息的同步, 在实现上, 以推模式为主, 拉模式为辅. 本文Agenda: (How)如何同步(How)如何设计同步位点如何设计 Gap过大(SyncGapOverflow) 机制如何设计Ack机制总结 提示: 本系列文章不会单纯的给出结论, 希望能够分享的是&…

SpringCloud-Netflix学习笔记13——Zuul路由网关

什么是Zuul? Zuul包含了对请求的路由和过滤两个最主要的功能。 其中路由功能负责将外部请求转发到具体的微服务实例上&#xff0c;是实现外部访问统一入口的基础&#xff0c;而过滤器功能则负责对请求的处理过程进行干预&#xff0c;是实现请求校验&#xff0c;服务聚合等功能…

最详细教你注册 ChatGPT,不会来找我

超强人工智能 ChatGPT 震撼来袭&#xff0c;它是美国人工智能研究实验室 OpenAI 新推出的一种自然语言处理工具&#xff0c;不想来体验一下嘛&#xff01;最详细教程手把手教你注册&#xff0c;不会来找我&#xff01; 准备工作 一个可以科学上网的工具&#xff0c;提供非 Ch…

文献阅读笔记 # CodeBERT: A Pre-Trained Model for Programming and Natural Languages

《CodeBERT: A Pre-Trained Model for Programming and Natural Languages》EMNLP 2020 (CCF-B)作者主要是来自哈工大、中山大学的 MSRA 实习生和 MSRA、哈工大的研究员。资源&#xff1a;code | pdf相关资源&#xff1a;RoBERTa-base | CodeNN词汇&#xff1a; bimodal: 双模态…

嵌入式设备搭建NFS环境(服务器/客户端、源码下载编译、文件系统适配、内核适配)

1、什么是nfs (1)NFS(Network File System)是网络文件系统&#xff0c;能让使用者访问网络上别处的文件就像在使用自己的计算机一样&#xff1b; (2)NFS是基于UDP/IP协议的应用&#xff0c;其实现主要是采用远程过程调用RPC机制&#xff0c;RPC提供了一组与机器、操作系统以及低…

CAS详解.

CAS这个机制就给实现线程安全版本的代码&#xff0c;提供了一个新的思路&#xff0c;之前通过加锁&#xff0c;把多个指令打包成整体&#xff0c;来实现线程安全。现在就可以考虑直接基与CAS来实现一些修改操作&#xff0c;也能保证线程安全&#xff08;不需要加锁&#xff09;…