【源码解析】聊聊线程池 实现原理与源码深度解析(二)

news2024/9/29 5:26:14

AbstractExecutorService

上一篇文章中,主要介绍了AbstractExecutorService的线程执行的核心流程,execute() 这个方法显然是没有返回执行任务的结果,如果我们需要获取任务执行的结果,怎么办?

Callable 就是一个可以获取线程执行的结果。

public abstract class AbstractExecutorService implements ExecutorService {

  	/*
     * 将任务包装成FutureTask任务。带返回值参数的
     */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    /**
    ** 不带返回值的
    **/
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        //1.将任务包装成RunableFuture对象,由于RunnableFuture是实现Runable类,所以execute的参数是一个可拓展的类型
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        //2,交给具体的执行器进行实现
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        //将任务装成成一个FutureTask任务
        RunnableFuture<T> ftask = newTaskFor(task);
        //执行任务
        execute(ftask);
        return ftask;
    }
  }

submit其实是一个重载的方法,分别是一个task,以及可以传递获取结果的任务,以及使用callable。

demo

从源码上看三个方法其实都是将任务进行了封装,然后调用线程池执行的核心方法

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Callable<Integer> resultCallable = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return 1 + 1;
            }
        };

        ExecutorService threadPool = Executors.newFixedThreadPool(1);
        Future<Integer> resultTask = threadPool.submit(resultCallable);
        System.out.println(resultTask.get());
        threadPool.shutdown();
    }

FutureTask

public class FutureTask<V> implements RunnableFuture<V> {
     
    /* NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0; // 初始化状态
    private static final int COMPLETING   = 1; // 结果计算完成或响应中断到赋值给返回值的状态
    private static final int NORMAL       = 2; // 任务正常完成,结果被set
    private static final int EXCEPTIONAL  = 3; // 任务抛出异常
    private static final int CANCELLED    = 4; // 任务被取消
    private static final int INTERRUPTING = 5; // 线程中断状态被设置为true 线程未响应中断
    private static final int INTERRUPTED  = 6; // 线程已被中断

    /** The underlying callable; nulled out after running */
    private Callable<V> callable; // 需要执行的任务
    /** The result to return or exception to throw from get() */
    // 执行callable的线程,调用FutureTask.run()方法通过CAS设置
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    // 执行callable的线程,调用FutureTask.run()方法通过CAS设置
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;

  
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW; // 初始化状态是new      // ensure visibility of callable
    }
}
 /* 继承了Runnable ,因为线程池中执行的也是Runnbale的任务
 */
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

FutureTask 实现RunnableFuture,也间接实现了run方法。

重点

我们知道 execute(ftask); 本质就是利用线程池进行执行,而线程执行的时候,其实就是启动对应任务的run方法。

task.run();
		// 这里是什么时候调用的,其实是
    // execute(ftask)传入的任务 task.run()
    public void run() {
        //不是新建状态 直接中止
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //核心,执行任务的call方法,你看就是调用普通的方法一样。
                    result = c.call();
                    //同步调用获取结果值
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    //设置结果值
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            //响应中断
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
  • 判断当前任务状态,非NEW直接返回
  • 执行对应c.call() 其实就是执行callable中的call方法。
  • 将返回值set进去
    protected void set(V v) {
        //CAS 去设置当前任务执行状态 new-completing
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            //返回结果outcome
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

get

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        //如果是在执行中,则等待一会
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        //返回结果
        return report(s);
    }

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        //设置了超时时间,则等待一定的时间,如果还没有获取到返回异常
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        //执行完成 返回x结果
        if (s == NORMAL)
            return (V)x;
        //如果任务取消,返回异常
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

awaitDone

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            //如果线程执行interrupted,直接抛出异常,并且将任务移除
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            //状态大于COMPLETING 说明完成了
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            //
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

小结

FutureTask是一个支持取消行为的异步任务执行器。该类实现了Future接口的方法。
如:

  1. 取消任务执行
  2. 查询任务是否执行完成
  3. 获取任务执行结果(”get“任务必须得执行完成才能获取结果,否则会阻塞直至任务完成)。

如果在当前线程中需要执行比较耗时的操作,但又不想阻塞当前线程时,可以把这些作业交给FutureTask,另开一个线程在后台完成,当当前线程将来需要时,就可以通过FutureTask对象获得后台作业的计算结果或者执行状态。

Future模式其实是多线程编程中常用的设计模式,主线程向另外一个线程提交任务,无需等待任务执行的结果,返回一个凭证,就是future,通过future.get()去获取结果。这个过程可能是阻塞的。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1283891.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【yolov8】与yolov5的区别及改进详解

图像识别技术在物联网、智能监控等领域广泛应用。而深度学习中的目标检测技术&#xff0c;能够帮助我们对图像中的目标进行识别&#xff0c;进而实现自动化控制。目前&#xff0c;Yolov8和Yolov5是目标检测领域热门的模型。 yolo目标检测原理yolov5详解yolov8yolov8结构图Conv模…

RK3588+MCU机器人控制器解决方案

1 产品简介 XMP04A 是一款信迈科技基于 RK3588 设计的高性能、低功耗的边缘计算设备&#xff0c; 内置 NPU 算力可达 6.0TOPSINT8&#xff0c;以及具备强大的视频编解码能力&#xff0c;最高可支持 32 路 1080P30fps 解码和 16 路 1080P30fps 编码 &#xff0c;支持 4K12…

数据库管理-第120期 初探Halo数据库(202301201)

数据库管理-第120期 初探Halo数据库&#xff08;202301201&#xff09; 12月份正好也是第120期&#xff0c;新的一篇文章&#xff0c;尝试一条新的路线。其实吧&#xff0c;Halo&#xff08;羲和&#xff09;这个数据库我较早时间就听说过&#xff08;早于今年DTCC&#xff0c…

SpringBoot+SSM项目实战 苍穹外卖(3)

继续上一节的内容&#xff0c;本节完成菜品管理功能&#xff0c;包括公共字段自动填充、新增菜品、菜品分页查询、删除菜品、修改菜品。 目录 公共字段自动填充新增菜品文件上传实现新增菜品实现 useGeneratedKeys 菜品分页查询删除菜品修改菜品根据id查询菜品实现修改菜品实现…

【Go】Go语言基础内容

变量声明&#xff1a; 变量声明&#xff1a;在Go中&#xff0c;变量必须先声明然后再使用。声明变量使用 var 关键字&#xff0c;后面跟着变量名和类型&#xff0c;如下所示&#xff1a; var age int这行代码声明了一个名为 age 的整数变量。 变量初始化&#xff1a;您可以在声…

JFrog----SBOM清单包含哪些:软件透明度的关键

文章目录 SBOM清单包含哪些&#xff1a;软件透明度的关键引言SBOM清单的重要性SBOM清单包含的核心内容SBOM的创建和管理结论 软件物料清单&#xff08;SBOM&#xff09;是一个在软件供应链安全中越来越重要的组成部分。它基本上是一份清单&#xff0c;详细列出了在特定软件产品…

ENVI植被指数阈值法

植被指数阈值法提取纯净像元 首先用ENVI打开无人机遥感影像 1. 假彩色显示 打开数据管理工具&#xff0c;无人机的4波段为红边波段 2. 波段计算 打开band math&#xff0c;输入 float(b1-b2)/(b1b2) 选择对应波段 3. 阈值筛选 阈值按经验值选的0.7&#xff0c;ndvi…

从零开始实现神经网络(二)_CNN卷积神经网络

参考文章: 介绍卷积神经网络1 介绍卷积神经网络2 在过去的几年里&#xff0c;关于卷积神经网络&#xff08;CNN&#xff09;的讨论很多&#xff0c;特别是因为它们彻底改变了计算机视觉领域。在这篇文章中&#xff0c;我们将建立在神经网络的基本背景知识的基础上&#xff0c;探…

[GPT-1]论文实现:Improving Language Understanding by Generative Pre-Training

Efficient Graph-Based Image Segmentation 一、完整代码二、论文解读2.1 GPT架构2.2 GPT的训练方式Unsupervised pre_trainingSupervised fine_training 三、过程实现3.1 导包3.2 数据处理3.3 模型构建3.4 模型配置 四、整体总结 论文&#xff1a;Improving Language Understa…

android studio 打开flutter项目 出现 dart sdk is not configured

android studio 版本 flutter版本 解决方式 1 点击Open Dart setting 2 打勾Enable Dart support for the project 3 Dart SDK path 选择flutter/bin/cache/dart-sdk 4 打勾Enable Dart support for the following modules

【NI-RIO入门】Real Time(实时系统解释)

1.什么是实时系统&#xff1f; 实时系统可以非常精确可靠的执行需要特定时许要求的系统&#xff0c;对于许多工程项目来说&#xff0c;通用操作系统&#xff0c;例如WINDOWS等标准PC上运行测量和控制程序是无法精确控制计时的&#xff0c;这些系统很容易受用户的其他程序、图像…

【数据结构】——栈|队列(基本功能)

目录 栈 基本概念 栈的常见基本操作 栈的存储 ✌栈的基本操作实现 栈的构建 栈的初始化 入栈 打印栈 出栈 获取栈顶元素 获取栈的有效元素个数 判断栈是否为空 销毁栈 队列 基本概念 队列的常见基本操作 ✌队列的基本操作实现 队列的构建 初始化 入队列 出…

BUUCTF [GXYCTF2019]BabySQli 1 详解!(MD5与SQL之间的碰撞)

题目环境burp抓包 随便输入值 repeater放包 在注释那里发现某种编码 MMZFM422K5HDASKDN5TVU3SKOZRFGQRRMMZFM6KJJBSG6WSYJJWESSCWPJNFQSTVLFLTC3CJIQYGOSTZKJ2VSVZRNRFHOPJ5 看着像是base编码格式 通过测试发现是套加密&#xff08;二次加密&#xff09; 首先使用base32对此编码…

【LeetCode热题100】【双指针】三数之和

给你一个整数数组 nums &#xff0c;判断是否存在三元组 [nums[i], nums[j], nums[k]] 满足 i ! j、i ! k 且 j ! k &#xff0c;同时还满足 nums[i] nums[j] nums[k] 0 。请 你返回所有和为 0 且不重复的三元组。 注意&#xff1a;答案中不可以包含重复的三元组。 示例 …

C语言 操作符详解

C语言学习 目录 文章目录 前言 一、算术操作符 二、移位操作符 2.1 左移操作符 2.2 右移操作符 三、位操作符 3.1 按位与操作符 & 3.2 按位或操作符 | 3.3 按位异或操作符 ^ 四、赋值操作符 五、单目操作符 5.1 逻辑反操作符&#xff01; 5.2 正值、负值-操作符 5.3 取地址…

老铺黄金IPO:古法黄金的下半场,从高端走向大众?

当前&#xff0c;国内“掘金热”持续走高。据中国黄金协会统计&#xff0c;2023年前三季度全国黄金消费835.07吨&#xff0c;同比增长7.32%。其中黄金首饰552.04吨&#xff0c;同比增长5.72%。 在市场需求带动下&#xff0c;老铺黄金这家专注古法黄金经营的企业今年上半年业绩…

ubuntu 创建conda 环境失败 HTTP 000 CONNECTION FAILED

如有帮助点赞收藏关注&#xff01; 如需转载&#xff0c;请注明出处&#xff01; 现在内存分配好了&#xff0c;创建一个专门的conda环境处理文件&#xff0c;报错了&#xff0c;创建不成功&#xff01; 什么情况&#xff0c;之前明明可以的。 百度吧。 参照一些博客修改了文档…

java synchronized详解

背景 在多线程环境下同时访问共享资源会出现一些数据问题&#xff0c;此关键字就是用来保证线程安全的解决这一问题。 内存可见的问题 在了解synchronized之前先了解一下java内存模型&#xff0c;如下图&#xff1a; 线程1去主内存获取x的值读入本地内存此时x的值为1&…

Linux(14):进程管理

一个程序被加载到内存当中运作&#xff0c;那么在内存内的那个数据就被称为进程(process)。 进程是操作系统上非常重要的概念&#xff0c;所有系统上面跑的数据都会以进程的型态存在。 进程 在 Linux底下所有的指令与能够进行的动作都与权限有关&#xff0c;而系统如何判定权…

大数据技术学习笔记(四)—— HDFS

目录 1 HDFS 概述1.1 HDFS 背景与定义1.2 HDFS 优缺点1.3 HDFS 组成架构1.4 HDFS 文件块大小 2 HDFS的shell操作2.1 上传2.2 下载2.3 HDFS直接操作 3 HDFS的客户端操作3.1 Windows 环境准备3.2 获取 HDFS 的客户端连接对象3.3 HDFS文件上传3.4 HDFS文件下载3.5 HDFS删除文件和目…