11.定时任务定时线程池详解

news2025/1/24 2:25:17
3.1 新增定时任务池

11.定时任务&定时线程池详解

​ 当我们不用任务框架时,我们想自己写一个定时任务时,我们能想起那个工具类呢?Timer ?还有吗?不知道了,下面我们要讲下ScheduledThreadPoolExecutor,定时任务线程池,可以执行一次任务,还可以执行周期性任务。

1.0 ScheduledThreadPoolExecutor的用法

定时线程池的类的结构图如下:
在这里插入图片描述

从结构图上可以看出定时线程池ScheduledThreadPoolExecutor继承了线程池ThreadPoolExecutor,也就是说它们之间肯定有相同的行为和属性。

ScheduledThreadPoolExecutor常用发的方法如下

1)schedule():一次行任务,延迟执行,任务只执行一次。

2)scheduleAtFixedRate():周期性任务,不不等待任务结束,每隔周期时间执行一次,新任务放进队列中.

3)scheduleWithFixedDelay():周期性任务,等待任务结束,每隔周期时间执行一次.

代码样例入下:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestScheduledThreadPoolExecutor {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        //无返回值 延迟5秒返回
        scheduledThreadPoolExecutor.schedule(()->{
            System.out.println("我要延迟5秒执行,只执行一次 ");
        },5000, TimeUnit.MICROSECONDS);
    }
}

可以用在启动项目时需要等待对象的加载,延迟执行一个任务。

带返回值的延迟执行任务如下

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestScheduledThreadPoolExecutor {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        //有返回值任务  可以用作异步处理任务不用等待结果
       ScheduledFuture<Integer> future =  scheduledThreadPoolExecutor.schedule(()->{
            System.out.println("我要延迟5秒执行,只执行一次 ");
            return 1;
        },5000, TimeUnit.MICROSECONDS);
        System.out.println(future.get());
    }
}

待返回值的任务,可以用于异步处理一个任务,等主线任务执行完,主要任务要知道异步任务的执行状态。

周期性任务:参数一样,方法名字不一样 例子如下

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestScheduledThreadPoolExecutor {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        //周期性的任务  发心跳 service1-service2 每次5s,发送一个心跳 下面的例子是不管任务是否执行完,一直想队列中放。 一个任务占一个线程。
        //scheduledThreadPoolExecutor.scheduleAtFixedRate(()->{
        //等待任务执行结束,在间隔2秒执行。
        scheduledThreadPoolExecutor.scheduleWithFixedDelay(()->{
            System.out.println("send heart beat");
            long startTime = System.currentTimeMillis(),nowTime = startTime;
            while((nowTime-startTime)<5000){
                nowTime = System.currentTimeMillis();
                try{
                    Thread.sleep(100);
                }catch (InterruptedException e ){
                    e.printStackTrace();
                }
            }
            System.out.println("task over .....");

            //任务启动多久之后   ,周期 每2s执行一次,时间单位
        },1000,2000,TimeUnit.MILLISECONDS);
    }
}

2.0 定时线程池使用场景

2.1 分布式锁-redis

​ 当使用setnx获取分布式锁(锁是有失效时间的),但是害怕任务没有执行完成锁失效了,怎么办呢?可以在任务的开始用一个定时线程池每隔一段时间看下锁是否失效如果没失效延长失效时间,如果失效不做处理。这样可以保证任务执行完成。

2.2 服务注册中心

服务注册客户端每隔多久向服务中心发送下自己的ip,端口,服务名字及服务状态。

2.3 和Timer的不同
import java.util.Timer;
import java.util.TimerTask;

public class TestTimer {
    public static void main(String[] args) throws InterruptedException {
        Timer timer = new Timer();
        timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                System.out.println("send star  -----");
                throw new RuntimeException("2134    243");
            }
        }, 1000, 2000);
    }
}

上面是的使用方法,从使用方法上和定时线程池的使用方法类似,都是周期性的执行任务;

不同的地方是:

Timer:单线程,线程挂了,不会再创建线程执行任务;

ScheduledThreadPoolExecutor:线程挂了,再提交任务,线程池会创建新的线程执行任务。

3.0 定时任务线程池实现原理

线程池执行过程:调用sechedule相关方法时,会先把任务添加到队列中,再又线程从队列中取出执行。

在这里插入图片描述

它接收SchduledFutureTask类型的任务,是线程调度的最小单位,有三种提交方法:

1)schedule():一次行任务,延迟执行,任务只执行一次。

2)scheduleAtFixedRate():周期性任务,不不等待任务结束,每隔周期时间执行一次,新任务放进队列中.

3)scheduleWithFixedDelay():周期性任务,等待任务结束,每隔周期时间执行一次.

它采用DelayedWorkQueue存储等待的任务:

1)DelayedWorkQueue内部封装了一个PriorityQueue,根据它会根据time的先后时间排序,若time相同则根据sequenceNumber排序;

2)DelayedWorkQueue是一个无界队列;

3.1 SchduledFutureTask

SchduledFutureTask 接收的参数(成员变量):

1)private long time :任务开始的时间;

2)private final long sequenceNumber:任务的序号;

3)private final long period:任务执行的间隔;

工作线程的执行 过程:

  • 工作线程会 从DelayedQueue取已经到期的任务去执行;
  • 执行结束后重新设置任务的到期时间,再次放回DelayedQueue

ScheduledThreadPoolExecutor会把执行的任务放到工作队列DelayedQueue中,DelayedQueue封装了一个PriorityQueue,PriorityQueue会对队列中的SchduledFutureTask 进行排序,具体的排序算法如下:

 public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

1)首先按照time排序,time小的排在前面,time大的排在后面;

2)如果time相同,按照sequenceNumber排序,sequenceNumber小的排在前面,sequenceNumber大的排在后面。如果两个task的执行时间相同,优先执行先提交的task.

ScheduledFutureTaskn的run方法实现:

run方法是调度task的核心,task 的执行实际是run方法的执行。

 public void run() {
            boolean periodic = isPeriodic();
     //如果当前线程池已经不支持执行任务,则取消
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
     //如果不需要周期性执行,则直接执行run方法
            else if (!periodic)
                ScheduledFutureTask.super.run();
     //如果需要周期性执行,先执行,后设置下次执行时间
            else if (ScheduledFutureTask.super.runAndReset()) {
                //计算下次执行时间
                setNextRunTime();
                //再次将执行任务添加到队列中,重复执行。
                reExecutePeriodic(outerTask);
            }
        }
    }

reExecutePeriodic 源码如下:

    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

该方法和delayExecute方法类似,不同的是:

1)由于调用reExecutePeriodic 方法时已经执行过一次周期性任务了,所以不会reject当前任务;

2)传入的任务一定是周期性任务

3.2 线程池任务提交

首先是schedule方法,该方法指任务在指定延迟时间到达后触发,只会执行一次。

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        //参数校验
        if (callable == null || unit == null)
            throw new NullPointerException();
        //这是一个嵌套结构,首先把用户提交的任务包装成ScheduledFutureTask
        //然后在调用decorateTask进行包装,该方法是留给用户去扩展的,默认是个空方法。
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

scheduleWithFixedDelay周期性执行任务:

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        // 将任务包装成 ScheduledFutureTask 类型
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        // 再次装饰任务,可以复写 decorateTask 方法,定制化任务 
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        // 放入延时队列中,ScheduledFutureTask 是接口 RunnableScheduledFuture 的一个实现类 
        // 所以放入队列还是 ScheduledFutureTask 类型的
        delayedExecute(t);
        return t;
    }

任务提交方法delayedExecute源码如下:

    private void delayedExecute(RunnableScheduledFuture<?> task) {
        //如果线程池已经关闭,则 使用决绝策略把提交任务拒绝掉
        if (isShutdown())
            reject(task);
        else {
            //与ThreadPoolExecutor不同的,这里直接把任务加入延迟队列
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                //如果当前状态无法执行,则取消
                remove(task))
                task.cancel(false);
            else
                //这里增加了一个worker线程,避免提交的任务没有worker去执行
                //原因就是该类没有像ThreadPoolExecutor 一样,核心worker满了,才放入队列
                ensurePrestart();
        }
    }
3.3 DelayedWorkerQueue

ScheduledThreadPoolExecutor之所以要在自己实现阻塞的工作队列,是因为ScheduledThreadPoolExecutor要求的工作队列有些特殊。

DelayedWorkerQueue是一个基于堆的数据结构,类似于DelayQueue和PriorityQueue。在执行定时任务的时候,每个任务执行时间都不同,所以DelayedWorkerQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的qianmian(注意:这里的顺序并不是绝对的,堆中的排序只保证了自己的下次执行时间要比父节点的下次执行时间要大,而叶子节点之间并不是顺序的。)

堆结构图如下

在这里插入图片描述

可知,DelayedWorkerQueue是一个基于最小堆结构的队列。堆结构可以使用数组表示,可以转换成如下的数组

在这里插入图片描述

在这种结构中,可以发下如下特点:

假设索引值从0开始,子节点的索引值为K,父节点的索引值为P,则:

  1. 一个节点的左节点的索引为:k=p*2+1;
  2. 一个节点的右节点的索引为:k=(p+1)*2;
  3. 一个节点的父节点的索引为:p=(k-1)/2;

为什么要使用DelayedWorkerQueue呢?

定时任务执行时需要取出最近要执行的任务,所以任务在队列中每次出队时,一定要是当前队列中执行时间最靠前的,所以自然要使用优先级队列。

DelayedWorkerQueue是一个优先级队列,它可以保证每次出队列的任务都是当前队列中执行时间最靠前的,由于它是基于堆结构的队列,堆结构在执行插入和删除操作时的最坏时间复杂度是O(logN).

DelayedWorkerQueue的属性

//队列初始化容量
private static final int INITIAL_CAPACITY = 16;
//根据初始化容量创建RunnableScheduledFuture 类型的数组;
private RunnableScheduledFuture<?>[] queue =
    new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
// leader 线程
private Thread leader = null;
// 当较新的任务在队列的头部可用时,或者新线程可能需要成为leader,则通过该条件发出信号
private final Condition available = lock.newCondition();

注意:这里的leader,它是Leader-Follower模式的变体,用于减少不必要的定时等待。什么意思呢?对于多线程的网络模型来说 所有线程会有三种身份中的一种:leader和follower,以及一个干活中的状态:proccesser。它的基木原则就是,永远最多只有一个leader,而所有follower都在等待成为leader。线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/408017.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue图片路径问题(动态引入:绝对路径、相对路径),require动态路径问题

Vue图片路径问题&#xff08;动/静态引入&#xff1a;绝对路径、相对路径&#xff09; DEMO实例&#xff08;可快速解决问题&#xff09;: 注意&#xff1a;绝对路径方式导入的图片需要存储在 publiic 文件夹下 静态导入相对路径&#xff1a; <img src"../../asset…

Mybatis+Servlet+Mysql 整合的一个小项目:对初学者非常友好,有助于初学者很快的上手Java Web

文章目录前言为何要写&#xff1f;目录结构1 依赖配置1.1 创建一个web项目1.2 依赖需求分析1.3 pom.xml2 配置Mybatis2.1 mybatis-config.xml2.2 UserMapper.xml2.3 UserMapper.interface3 配置Tomcat4 Servlet类4.1 loginServlet014.2 registerRequest015 静态页面代码5.1 Htm…

Vue开发实例(13)之axios和mockjs的安装与使用

作者简介 作者名&#xff1a;编程界明世隐 简介&#xff1a;CSDN博客专家&#xff0c;从事软件开发多年&#xff0c;精通Java、JavaScript&#xff0c;博主也是从零开始一步步把学习成长、深知学习和积累的重要性&#xff0c;喜欢跟广大ADC一起打野升级&#xff0c;欢迎您关注&…

Vue3中 内置组件 Teleport 详解

1. 基本概念 1.1 简单理解 不管是 Vue2 还是 Vue3 中都有内置组件的存在&#xff0c;如 component 内置组件、transition 内置组件等等。内置组件就是官方给我们封装的全局组件&#xff0c;我们直接拿来用就可以了。 在 Vue3 中新增了 Teleport 内置组件&#xff0c;先来看下…

【JavaScript-数组全家福】

目录 前言 数组 1.创建 new Array数组 2.检测是否为数组 1.使用instanceof检测是否为数组 2.使用Array.isArray()来检测 3.添加删除数组方法 4.筛选数组 5.数组排序 6.数组索引方法 7.数组去重 8.数组转字符串 写在最后 前言 博主是&#x1f466;一个帅气的boy&#…

前端案例:飞机大战( js+dom 操作,代码完整,附图片素材)

目录 一、案例效果 二、实现思路 三、完整代码详细注释 四、涉及要点 五、案例素材 一、案例效果 二、实现思路 创建游戏背景板&#xff1b;创建我方战机&#xff0c;鼠标进入游戏面板后其随鼠标轨迹运动&#xff1b; onmousemove创建子弹&#xff0c;让子弹周期性的在战…

Grafana alert预警+钉钉通知

1 Grafana alert预警 如下图所示&#xff0c;主要是前3步&#xff0c;设置alert rules、contact points 、notification policies。alert rules主要设置触发警告的规则&#xff1b;contact points设置通过什么发送预警&#xff0c;如钉钉&#xff1b;notification policies 将…

鼠标事件、键盘事件,你听过嘛?

&#x1f4dc;个人简介 ⭐️个人主页&#xff1a;微风洋洋&#x1f64b;‍♂️ &#x1f351;博客领域&#xff1a;编程基础,后端 &#x1f345;写作风格&#xff1a;干货,干货,还是tmd的干货 &#x1f338;精选专栏&#xff1a;【JavaScript】 &#x1f680;支持洋锅&#xff…

Chrome-谷歌浏览器多开教程

Chrome谷歌浏览器多开教程在我们的日常生活中&#xff0c;我们常常在某一时刻需要在进行多个账号的查看&#xff0c;例如在跨境电商时&#xff0c;我们常常需要开多各店铺页面&#xff0c;又或者&#xff0c;我们在玩游戏时&#xff0c;需要开多个账号同时进行运作&#xff0c;…

一文通透从输入URL到页面渲染的全过程----高频面试

一文通透从输入URL到页面渲染的全过程----高频面试 喜欢大海 喜欢夕阳 写下便是永恒 文章目录一文通透从输入URL到页面渲染的全过程----高频面试重温进程与线程什么是进程什么是线程进程和线程的区别多进程和多线程JS为什么是单线程浏览器相关浏览器是多进程的浏览器包含哪些进…

jeecg-boot首页加载速度优化全过程

优化结果 前端和后端部署在轻量服务器: 以下结果都是三次强刷得到的 优化前: 优化后: 优化方案 开启Nginx压缩 方案来自于:jeecg官方文档 作用:通过nginx内置的压缩策略来压缩静态资源&#xff0c;提升资源请求速度 在nginx.conf 的 http 中加入以下片断: # gzip …

【微信小程序 | 实战开发】常用的视图容器类组件介绍和使用(1)

个人名片: 🐼作者简介:一名大二在校生,喜欢编程🎋 🐻‍❄️个人主页🥇:小新爱学习. 🐼个人WeChat:hmmwx53 🕊️系列专栏:🖼️ 零基础学Java——小白入门必备重识C语言——复习回顾

component lists rendered with v-for should have explicit keys

component lists rendered with v-for should have explicit keys 发现问题 关键报错 (Emitted value instead of an instance of Error) : component lists rendered with v-for should have explicit keys. See https://vuejs.org/guide/list.html#key for more info. 具体…

VUE之Element-ui文件上传详解

引言 对于文件上传&#xff0c;在开发主要涉及到以下两个方面&#xff1a; 单个文件上传和表单一起实现上传&#xff08;这种情况一般都是文件上传之后&#xff0c;后端返回保存在服务器的文件名&#xff0c;最后和我们的表单一起上传&#xff09; 单文件上传 element-ui中…

Cesium加载离线地图和离线地形

文章目录 前言一、Cesium加载离线地图 1.1 下载数据2.2 数据处理2.3 地图发布2.4下载速度改进 二、Cesium加载离线地形 2.1 下载数据2.2 数据处理2.3 地形发布2.4 遇到的问题 前言 直接把地图数据切片&#xff0c;然后通过nginx以静态服务方式发布。 使用工具&#xff1a;…

this.$emit使用方法【前端技术】

this.$emit()主要用于子组件向父组件传值。 下面就给大家举一个实际开发中使用到的案例。 需求&#xff1a; 点击关联项目&#xff0c;弹出关联项目数据进行选择一条数据&#xff0c;点击确定&#xff0c;项目编号会回显到关联项目中。 1新增页面 2 新增页面中点击关联项目弹出…

vue3全局自定义指令实现按钮权限控制

1. 文档介绍的全局自定义指令 在Vue的模板语法中我们除了使用&#xff1a;v-show、v-for、v-model等&#xff0c;Vue其实 也允许我们来自定义自己的指令。 1&#xff09;注意&#xff0c;在 Vue 中&#xff0c;代码复用和抽象的主要形式是组件。 2&#xff09;然而&#xff0c…

HTML+CSS实现搜索框

HTMLCSS实现搜索框&#xff1a; 需求分析&#xff1a; 1、输入框焦点事件 onfocus:成为焦点, 点击输入框的时候&#xff0c;出现闪烁光标&#xff0c;此时可以输入内容。 onblur :失去焦点, 点击页面空白区域&#xff0c;光标消失。此时不可以输入内容。 2、获取元素 3、…

vite配置@别名,以及如何让vscode智能提示路经

vite配置别名 vite.config.ts import { defineConfig } from vite import vue from vitejs/plugin-vue// 配置别名import { resolve } from "path"; // https://vitejs.dev/config/ export default defineConfig({plugins: [vue()],// ↓解析配置resolve: {// ↓路…

HTML基础之form表单

目录 一&#xff1a;表单属性 1 name 属性 2 action属性 3 method属性 4 target属性 5 enctype属性 二&#xff1a;表单对象 1 input标签 2 多行文本textarea 3 下拉列表select 4 表单控件&#xff08;元素&#xff09;button 5 表单控件&#xff08;元素&#xff…