并发-Executor框架笔记

news2025/1/19 10:35:08

Executor框架

jdk5开始,把工作单元与执行机制分离开来,工作单元包括Runable和Callable,执行机制由Executor框架来提供。

Executor框架简介

Executor框架的两级调度模型

  • Java线程被一对一映射为本地操作系统线程
    • java线程启动会创建一个本地操作系统线程
    • java线程终止操作系统线程也会被回收
    • 操作系统会调度所有线程并将它们分配给可用的cpu
  • 在上层,java多线程程序通常把应用分解为若干任务,然后使用用户级调度器将这些任务映射为固定数量的线程。在底层,操作系统内核将这些线程映射到硬件处理器上
  • 应用通过Executor框架控制上层调度,下层由操作系统内核控制,下层调度不受应用程序控制。

在这里插入图片描述

Executor框架的结构与成员

Executor框架结构
  • 主要由任务,任务执行和异步计算结果组成

    • 任务:包括被执行任务需要实现的接口:Runnable和Callable
    • 任务的执行:包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorServie接口。(ThreadPoolExecutor和ScheduledThreadPoolExecutor两个关键类实现了)
    • 异步计算的结果:包括接口Future和实现Future接口的FutureTask类
  • 类和接口

  • 在这里插入图片描述

    • Executor是一个接口,是Executor框架的基础,将任务的提交与任务的执行分离开来
    • ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务
    • ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。比Timer更灵活,功能更强大。
    • Future接口和实现Future接口的FutureTask类,代表异步计算的结果
    • Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行
  • 使用:

    • 主线程创建实现Runnable或Callable接口的任务对象,工具类Executors可以把一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)或Executors.callable(Runnable task,Object result))
    • 可以把Runnable对象直接交给ExecutorService执行(ExecutorService.execute(Runnable command)),或者吧Runnable对象或Callable对象提交给ExecutorService执行(ExecutorService.submit(Runnable task)或ExecutorService.submit(Callable task))
    • 执行ExecutorService.submit(…),将会返回一个实现Future接口的对象,也可以创建FutureTask然后直接交给ExecutorService执行
    • 主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel来取消此任务
Executor框架的成员
  • ThreadPoolExecutor:通常使用工厂类Executors创建

    • FixedThreadPool:创建固定线程数的FixedThreadPool

      • 适用于为了满足资源管理的需求,需要限制当前线程数量的应用场景,适用于负载比较重的服务器

      • public static ExecutorService newFixedThreadPool(int nThreads)
        public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
        
    • SingleThreadExecutor:创建单个线程

      • 适用于需要保证顺序地执行各个任务,并且在任意时间点,不会有多个线程是活动的应用场景

        public static ExecutorService newSingleThreadExecutor()
        public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
        
    • CachedThreadPool:创建一个会根据需要创建新线程

      • 大小无界的线程池,适用于执行很多短期异步任务的小程序,或负载教轻的服务器

      • public static ExecutorService newCachedThreadPool()
        public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
        
  • ScheduledThreadPoolExecutor:通常使用工厂类Executors创建

    • 创建ScheduledThreadPoolExecutor:包含若干线程的ScheduledThreadPoolExecutor

      • 适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求需要限制后台线程数量的应用场景

      • public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
        
    • 创建SingleThreadScheduledExecutor:包含一个线程的ScheduledThreadPoolExecutor

      • 适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景

      • public static ScheduledExecutorService newSingleThreadScheduledExecutor(int corePoolSize)
        public static ScheduledExecutorService newSingleThreadScheduledExecutor(int corePoolSize, ThreadFactory threadFactory)
        
  • Future接口

    • Future接口和Future接口的FutureTask类用来表示异步计算的结果

    • 把Runnable接口或Callable接口的实现类提交给ThreadPoolExecutors或ScheduledThreadPoolExecutors时,会返回给一个FutureTask对象

    • <T> Future<T> submit(Callable<T> task)
      <T> Future<T> submit(Runnable task, T result)
      Future<> submit(Runnable task)
      
  • Runnable接口和Callable接口

    • Runnable不会返回结果,Callable会返回结果

    • 可以把Runnable包装成Callable

    • public static Callable<Object> callable(Runnable task)
      
    • 可以把Runnable和一个待返回的结果包装成一个Callable

    • public static <T> Callable<T> callable(Runnable task, T result)
      
    • 把callable对象提交给执行时,submit会返回一个FutureTask对象

      • 执行get方法等待任务执行完成,任务执行完成后get方法将返回该任务的结果

ThreadPoolExecutor详解

构成组件

  • corePool:核心线程池大小
  • maximumPool:最大线程池大小
  • BlockingQueue:用来暂时保存任务的工作队列
  • RejectedExecutionHandler:当ThreadPoolExecutor已经关闭或者饱和时,execute方法将要调用的Handler

创建类型

  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool

FixedThreadPool详解

固定线程数线程池

public static ExecutorService newFixedThreadPool(int nThreads){
    return new ThreadPoolExecutor(nThreads,nThreads, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
}
  • corePoolSize 和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数
  • 当啊线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止
  • keepAliveTime设置为0,多余空闲线程会被立即终止

流程

  1. 如果当前运行线程数少于corePoolSize,则创建新线程来执行任务
  2. 在线程池完成预热后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue
  3. 线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务执行
  • 使用无界队列LinkedBlockingQueue作为线程池工作队列,使用无界队列作为工作队列会对线程池带来影响
    • 当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize
    • 由于上条,使用无界队列时,maximumPoolSize将是一个无效参数
    • 由于上条和上上条,使用无界队列时keepAliveTIme将是一个无效参数
    • 由于使用无界队列,运行中的FixedThreadPool不会拒绝任务

SingleThreadExecutor详解

使用单个worker线程的Executor

public static ExecutorService newSingleThreadExecutor(){
    return new ThreadPoolExecutor(1,1, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
}
  • corePoolSize 和maximumPoolSize都被设置为1
  • keepAliveTime设置为0,多余空闲线程会被立即终止
  • 使用无界队列LinkedBlockingQueue作为线程池的工作队列

工作流程

  1. 如果当前运行的线程数少于corePoolSize,则创建一个新线程来执行任务
  2. 在线程池完成预热后,将任务加入LinkedBlockingQueue
  3. 线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行

CachedThreadPool详解

根据需要创建新线程的线程池

public static ExecutorService newCachedThreadPool(int nThreads){
    return new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L,TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>())
}
  • corePoolSize设置为0
  • maximumPoolSize被设置为Integer.MAX_VALUE
  • keepAliveTime设置为60L,线程池中的空闲线程等待新任务的最长时间是60秒,空闲线程超过60秒将会被终止
  • 使用没有容量的SynchronousQueue作为线程池的工作队列
  • 如果主线程提交的任务速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程,极端情况下,会因为创建过多线程为耗尽cpu和内存资源

流程:

  1. 首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程正则执行SynchronousQueue.poll,那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成;否则执行步骤2
  2. 当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行SynchronousQueue.poll.这种情况下,步骤1 将失败,此时CachedThreadPool会创建一个新线程执行任务,execute方法执行完成
  3. 在步骤2中新创建将任务执行完后,会执行SynchronousQueue.poll,这个poll操作会让空闲线程最多在SynchronousQueue中等待60s,如果60s内主线程提交了一个新任务,那么这个空闲线程将执行主线程提交的新任务,否则这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。

ScheduledThreadPoolExecutor详解

  • 继承自ThreadPoolExecutor
  • 主要用来在给定的延迟之后运行任务,会定期执行任务。
  • 可以在构造函数中指定多个对应的后台线程数

ScheduledThreadPoolExecutor运行机制

  • DelayQueue是一个无界队列。

主要两部分

  • 当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate方法或者scheduleWithFixedDelay方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFuture接口的ScheduledFutureTask
  • 线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务。

ScheduledThreadPoolExecutor为了实现周期性的执行任务,对ThreadPoolExecutor做了如下修改

  • 使用DelayQueue作为任务队列
  • 获取任务的方式不同
  • 执行周期任务后,增加了额外的处理

ScheduledThreadPoolExecutor的实现

ScheduledFutureTask成员变量:

  • long time:表示这个任务将要被执行的具体时间
  • long sequenceNumber:表示这个任务被添加到ScheduledThreadPoolExecutor中的序号
  • long period:表示任务执行的间隔周期

DelayQueue封装了一个优先级队列,这个优先级队列会将队列的任务根据time排列,小的在前,如果time相同,比较sequenceNumber,小的在前

ScheduledThreadPoolExecutor执行某个周期任务步骤

  • 线程从DelayQueue中获取已到期的ScheduledFutureTask,到期任务是指ScheduledFutureTask的time大于等于当前时间
    • 获取Lock
    • 获取周期任务
      • 如果PriorityQueue为空,当前线程到Condition中等待
      • 如果PriorityQueue的头元素的time时间比当前时间大,到condition中等待到time时间
      • 获取PriorityQueue的头元素,如果不为空,则唤醒condition中等待的所有线程
    • 释放Lock
  • 线程执行ScheduledFutureTask
  • 线程修改ScheduledFutureTask的time后边变为下次将要被执行的时间
  • 线程把这个修改time之后的ScheduledFutureTask放回到DelayQueue中
    • 获取Lock
    • 添加任务
      • 向PriorityQueue添加任务
      • 如果添加的任务是头元素,唤醒Condition中等待的所有线程
    • 释放Lock

FutureTask详解

Future接口和实现Future接口的FutureTask类,代表异步计算的结果

简介

  • FutureTask除了实现Future接口外,还实现了Runnable接口。
  • FutureTask可以交给Executor执行,也可以由调用线程直接执行
  • FutureTask可以处于3种状态
    • 未启动:run方法还没有被执行前
      • 执行FutureTask.get将导致调用线程阻塞
      • 执行FutureTask.cancel将导致此任务永远不会被执行
    • 已启动:run方法被执行过程中
      • 执行FutureTask.get将导致调用线程阻塞
      • 执行FutureTask.cancel(true)将以中断执行此任务线程的方式来试图停止任务
      • 执行FutureTask.cancel(false)将不会对正在执行此任务的线程产生影响
    • 已完成:run方法执行完后正常结束,或被取消,或执行run方法时抛出异常而异常结束
      • 执行FutureTask.get将导致调用线程立即返回结果或抛出异常
      • 执行FutureTask.cancel()返回false

使用

  • 可以把FutureTask交给Executor执行
  • 可以通过ExecutorService.submit返回一个FutureTask,然后执行FutureTask.get或cancel方法
  • 也可以单独使用FutureTask

当一个线程需要等待另一个线程把某个任务执行完后才能执行,此时可以使用FutureTask

实现

  • 基于AQS实现,包含两种类型操作

    • 至少一个acquire操作,阻塞调用线程,除非直到AQS状态允许这线程继续执行,FutureTask的acquire操作为get方法调用
    • 至少一个release操作,这个操作改变AQS的状态,改变后的状态可以允许一个或多个阻塞线程被解除阻塞,FutureTask的release操作包括run和cancel
  • Sync是FutureTask的内部私有类,继承自AQS,FutureTask的所有公有方法都直接委托给了内部私有Sync

  • FutureTask.get方法会用AQS的acquireSharedInterruptibly方法,执行过程

    • 调用AQS的acquireSharedInterryptibly方法,
      • 回调在子类Sync中实现的tryAcquireShared()方法来判断acquire操作是否可以成功
        • 成功条件:state为执行完成状态RAN或已取消状态CANCELLED,且runner不为null
    • 如果成功则get方法立即返回,如果失败则到线程等待队列中去等待其他线程执行release操作
    • 当其他线程执行release操作唤醒当前线程后,当前线程再次执行tryAcquireShared()将返回正值1,当前线程将离开线程等待队列并唤醒它的后继线程
    • 最后返回计算的结果或抛出异常

    FutureTask.run方法

    • 执行在构造函数中指定的任务(Callable.call)
    • 以原子方式来更新同步,如果这个原则操作成功,就设置代表计算结果的变量result的值为callable.call的返回值,然后调用AQS.releaseShared
    • AQS.releaseShared首先会回调在子类Sync中实现的tryReleaseShared来执行release操作,AQS.releaseShared,然后唤醒线程等待队列中的第一个线程
    • 调用FutureTask.done1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/997439.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux单列模式实现线程池

目录 一、单列模式 1.1 单列模式概念以及实现条件 1.2 饿汉模式 1.1.1 饿汉模式代码实现 1.1.2 饿汉模式特征和优缺点 1.3 懒汉模式 1.3.1 懒汉模式代码实现 1.3.2 懒汉模式特征以及优缺点 二、线程池 2.1 线程池概念 2.2 实现简单线程池逻辑 2.3 模拟实现懒汉模式线程…

【八大经典排序算法】:直接插入排序、希尔排序实现 ---> 性能大比拼!!!

【八大经典排序算法】&#xff1a;直接插入排序、希尔排序实现 ---> 性能大比拼&#xff01;&#xff01;&#xff01; 一、 直接插入排序1.1 插入排序原理1.2 代码实现1.3 直接插入排序特点总结 二、希尔排序 ( 缩小增量排序 )2.1 希尔排序原理2.2 代码实现2.3 希尔排序特点…

UE5、CesiumForUnreal实现瓦片坐标信息图层效果

文章目录 1.实现目标2.实现过程2.1 原理简介2.2 cesium-native改造2.3 CesiumForUnreal改造2.4 运行测试3.参考资料1.实现目标 参考CesiumJs的TileCoordinatesImageryProvider,在CesiumForUnreal中也实现瓦片坐标信息图层的效果,便于后面在调试地形和影像瓦片的加载调度等过…

【C++入门到精通】C++入门 ——搜索二叉树(二叉树进阶)

阅读导航 前言一、搜索二叉树简介1. 概念2. 基本操作⭕搜索操作&#x1f36a;搜索操作基本代码&#xff08;非递归&#xff09; ⭕插入操作&#x1f36a;插入操作基本代码&#xff08;非递归&#xff09; ⭕删除操作&#x1f36a;删除操作基本代码&#xff08;非递归&#xff0…

给老婆写的,每日自动推送暖心消息

文章の目录 一、起因二、环境准备三、创建nestjs项目四、控制器五、service服务层1、获取Access token2、组装模板消息数据3、获取下次发工资还有多少天4、获取距离下次结婚纪念日还有多少天5、获取距离下次生日还有多少天6、获取时间日期7、获取是第几个结婚纪念日8、获取相恋…

前端面试题JS篇(4)

浏览器缓存 浏览器缓存分为强缓存和协商缓存&#xff0c;当客户端请求某个资源时&#xff0c;获取缓存的流程如下&#xff1a; 先根据这个资源的一些 http header 判断它是否命中强缓存&#xff0c;如果命中&#xff0c;则直接从本地获取缓存资源&#xff0c;不会发请求到服务…

vivado xpm 使用和封装

vivado xpm 使用和封装 tools -> language templates

【JavaScript】WebAPI入门到实战

文章目录 一、WebAPI背景知识1. 什么是WebAPI&#xff1f;2. 什么是API&#xff1f; 二、DOM基本概念三、获取元素三、事件初识1. 点击事件2. 键盘事件 四、操作元素1. 获取/修改元素内容2. 获取/修改元素属性3. 获取/修改表单元素属性4. 获取/修改样式属性 五、操作节点1. 新增…

scratch还原轨迹 2023年5月中国电子学会图形化编程 少儿编程 scratch编程等级考试四级真题和答案解析

目录 scratch还原轨迹 一、题目要求 1、准备工作 2、功能实现 二、案例分析

Python:安装Flask web框架hello world

安装easy_install pip install distribute 安装pip easy_install pip 安装 virtualenv pip install virtualenv 激活Flask pip install Flask 创建web页面demo.py from flask import Flask app Flask(__name__)app.route(/) def hello_world():return Hello World! 2023if _…

基于springboot实现的rabbitmq消息确认

概述 RabbitMQ的消息确认有两种。 一种是消息发送确认。这种是用来确认生产者将消息发送给交换器&#xff0c;交换器传递给队列的过程中&#xff0c;消息是否成功投递。发送确认分为两步&#xff0c;一是确认是否到达交换器&#xff0c;二是确认是否到达队列。 第二种是消费接…

【APUE】标准I/O库

目录 1、简介 2、FILE对象 3、打开和关闭文件 3.1 fopen 3.2 fclose 4、输入输出流 4.1 fgetc 4.2 fputc 4.3 fgets 4.4 fputs 4.5 fread 4.6 fwrite 4.7 printf 族函数 4.8 scanf 族函数 5、文件指针操作 5.1 fseek 5.2 ftell 5.3 rewind 6、缓冲相关 6.…

安装samba服务器

1.实验目的 &#xff08;1&#xff09;了解SMB和NETBIOS的基本原理 &#xff08;2&#xff09;掌握Windows和Linux之间&#xff0c;Linux系统之间文件共享的基本方法。 2.实验内容 &#xff08;1&#xff09;安装samba服务器。 &#xff08;2&#xff09;配置samba服务器的…

Visual Studio 线性表的链式存储节点输出引发异常:读取访问权限冲突

问题&#xff1a; 写了一个线性表的链式存储想要输出&#xff0c;能够输出&#xff0c;但是会报错&#xff1a;读取访问权限冲突 分析&#xff1a; 当我们输出到最后倒数第二个节点时&#xff0c;p指向倒数第二个节点并输出&#xff1b; 下一轮循环&#xff1a;p指向倒数第二…

Helm Kubernetes Offline Deploy Rancher v2.7.5 Demo (helm 离线部署 rancher 实践)

文章目录 1. 简介2. 预备条件3. 选择 SSL 配置4. 离线安装的 Helm Chart 选项5. 下载介质6. 生成证书7. 镜像入库8. 安装 rancher9. 配置 nodeport10. 配置 ingress11. 界面访问11.1 首页预览11.2 查看集群信息11.3 查看项目空间11.4 查看节点信息 1. 简介 Rancher 是一个开源…

17-数据结构-查找-(顺序、折半、分块)

简介&#xff1a;查找&#xff0c;顾名思义&#xff0c;是我们处理数据时常用的操作之一。大概就是我们从表格中去搜索我们想要的东西&#xff0c;这个表格&#xff0c;就是所谓的查找表&#xff08;存储数据的表&#xff09;。而我们怎么设计查找&#xff0c;才可以让计算机更…

lv4 嵌入式开发-4 标准IO的读写(二进制方式)

目录 1 标准I/O – 按对象读写 2 标准I/O – 小结 3 标准I/O – 思考和练习 文本文件和二进制的区别&#xff1a; 存储的格式不同&#xff1a;文本文件只能存储文本。除了文本都是二进制文件。 补充计算机内码概念&#xff1a;文本符号在计算机内部的编码&#xff08;计算…

2023/09/10

文章目录 1. 使用Vue单页面全局变量注意事项2. 伪元素和伪类3. Vue3中定义数组通常使用ref4. Vue Router的 $router 和 $route5. Vue路由中的query和params的区别6. vue3defineExpose({})属性不能重命名&#xff0c;方法可以重命名7. 显卡共享内存的原理8. deltaY9. 快速生成方…

电池2RC模型 + 开路电压法 + 安时积分 + 电池精度测试 + HPPC脉冲

电池2RC模型 电池2RC模型是一种等效电路模型&#xff0c;用于描述电池的动态特性。该模型将电池视为一个理想电容器和一个理想电阻的并联&#xff0c;其中理想电容器代表电池的化学反应&#xff0c;理想电阻代表电池的内阻。该模型适用于描述电池的充电和放电过程。 开路电压…

Java中如何判断字符串输入[hello,world]是不是字符串数组参数

Java中如何判断字符串输入[hello,world]是不是字符串数组参数&#xff1f; 在Java中&#xff0c;可以使用正则表达式来判断一个字符串是否符合字符串数组的参数格式。你可以使用matches()方法和对应的正则表达式进行判断。 以下是一个示例代码&#xff1a; public static bo…