并发编程:ScheduledThreadPoolExecutor你真的了解吗?

news2024/11/18 8:21:40

前言

首先看到标题,我们其实很多人都知道,但是呢 在实际项目中我们面对很多延迟任务实现方案有很多选择,甚至直接在网上百度反正都能实现就行,但是忽略了很多细节,导致生产上的事故,都是因为没有真正了解到底层的运行,当然这篇博文不去说源码而是说实战,源码的尽头都是线程池,这个已经在之前已经详细介绍过了

ScheduledThreadPoolExecutor和@Scheduled的关系

说到定时任务大家首先想到的是@Scheduled注解,其实这个注解的底层就是ScheduledThreadPoolExecutor
在Spring Boot中集成ScheduledThreadPoolExecutor用于执行定时任务,你可以通过以下步骤来实现:

定义线程池配置类

创建一个配置类来定义你的ScheduledThreadPoolExecutor实例。这里是一个简单的示例:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

@Configuration
public class ThreadPoolConfig {

    @Bean
    public ThreadPoolTaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(10); // 设置线程池大小
        scheduler.setThreadNamePrefix("scheduled-task-"); // 设置线程前缀
        scheduler.setRemoveOnCancelPolicy(true); // 取消任务时是否移除
        return scheduler;
    }
}

注意,虽然上述示例中使用了ThreadPoolTaskScheduler,它是Spring对定时任务线程池的封装,底层也是基于ScheduledThreadPoolExecutor实现的,因此适合用于集成定时任务。

使用@EnableScheduling开启定时任务支持

在你的Spring Boot主类或配置类上添加@EnableScheduling注解,以启用定时任务功能。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

编写定时任务方法

在需要执行定时任务的类上使用@Component注解使其成为Spring管理的Bean,并在相应的方法上使用@Scheduled注解来定义任务的执行规则。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class MyScheduledTasks {

    @Autowired
    private SomeService someService;

    @Scheduled(fixedRate = 60000) // 每60秒执行一次
    public void scheduledTask() {
        someService.doSomething();
    }
}

在这个例子中,fixedRate = 60000表示方法执行的间隔时间,单位是毫秒,这里设置为每60秒执行一次。

通过上述步骤,你就可以在Spring Boot应用中集成并使用ScheduledThreadPoolExecutor来执行定时任务了。记得根据实际需求调整线程池的大小和其他参数。

切记spring中几乎所有关于定时任务线程池的封装都是通过ScheduledThreadPoolExecutor来执行的
另外这个有个陷阱那就是一定需要自定义线程池的配置类,为什么呢,看看@Scheduled引发的生产事故

@Scheduled引发的生产事故

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

上面的截图只是一个demo,其中一个定时任务卡死了,导致其他的定时任务无法启动,从而造成了线上故障,而这些定时器任务都是由ThreadPoolTaskScheduleduler来调度的,这个这个线程池的核心线程是1,所以一定要自定义配置类,重新设置核心线程数
在这里插入图片描述
或者
在这里插入图片描述

ScheduledThreadPoolExecutor和@Scheduled能解决什么业务场景

从刚刚上面的例子无论是ThreadPoolTaskScheduler还是@Scheduled其实底层都是ScheduledThreadPoolExecutor进行封装的,那么接下来
我们看下这个如何使用呢?

scheduledExecutorService.scheduled
等待执行时间之后,运行任务。(任务,延迟时间,延迟时间单位)
ScheduledExecutorService.schedule(Callable callable, long delay, TimeUnit unit)

主程序

创建5个任务,分别延迟1 2 3 4 5 秒执行,然后等待任务完成。

package xyz.jangle.thread.test.n4_6.schedule;
 
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
 
/**
 * 4.6、schedule 在执行器内延迟执行任务
 * 	创建5个任务,分别延迟1 2 3 4 5 秒执行,然后等待任务完成。
 * 
 * 	注:如果希望在指定时间执行任务, 则计算当前到指定时间的间隔。
 * @author jangle
 * @email jangle@jangle.xyz
 * @time 2020年8月21日 下午7:00:39
 * 
 */
public class M {
 
	public static void main(String[] args) {
 
		ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
/*
//		ScheduledThreadPoolExecutor threadPool = (ScheduledThreadPoolExecutor) scheduledExecutorService;
		// 设置false的情况下,会使得调用shutdown()后,未执行的延迟任务不再执行(默认是true,继续执行延迟的任务)
//		threadPool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
*/
		System.out.println("Main:开始执行主程序" + new Date());
		for (int i = 0; i < 5; i++) {
			Task task = new Task("Task " + i);
			// 执行task任务,延迟i+1秒执行。
			scheduledExecutorService.schedule(task, i + 1, TimeUnit.SECONDS);
		}
		// shutdown 之后,是否继续执行延迟任务受属性executeExistingDelayedTasksAfterShutdown影响
		scheduledExecutorService.shutdown();
		try {
			// 等待所有任务执行完毕,最长等待2天时间。
			scheduledExecutorService.awaitTermination(2, TimeUnit.DAYS);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("Main:结束执行" + new Date());
	}
 
}

任务

package xyz.jangle.thread.test.n4_6.schedule;
 
import java.util.Date;
import java.util.concurrent.Callable;
 
/**
 * 	普通的任务
 * @author jangle
 * @email jangle@jangle.xyz
 * @time 2020年8月21日 下午7:02:27
 * 
 */
public class Task implements Callable<String> {
 
	private final String name;
 
	public Task(String name) {
		super();
		this.name = name;
	}
 
	@Override
	public String call() throws Exception {
		System.out.println(this.name + "开始执行任务" + new Date());
		return "hello,schedule task";
	} 
}

上面的代码可以优化下:

package xyz.jangle.thread.test.n4_6.schedule;

import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * 示例展示如何使用ScheduledExecutorService安排延时执行的任务。
 * 五个任务分别延迟1至5秒后执行,主程序等待所有任务完成。
 * 
 * 注意:若希望在特定时间点执行任务,需计算当前时间至该时间点的时间差。
 * 
 * @author Jangle
 * @since 2020-08-21
 */
public class ScheduledTaskDemo {

    public static void main(String[] args) {
        // 创建单线程的ScheduledExecutorService
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

        System.out.println("主程序开始执行:" + LocalDateTime.now());
        
        // 提交五个延迟任务
        for (int delay = 1; delay <= 5; delay++) {
            int finalDelay = delay; // 使用final或effectively final变量在lambda表达式中
            executor.schedule(() -> {
                System.out.println("执行任务:Task " + finalDelay + " at " + LocalDateTime.now());
            }, finalDelay, TimeUnit.SECONDS);
        }

        // 关闭调度器,不再接受新任务,已提交的任务将继续执行
        executor.shutdown();

        try {
            // 等待所有任务完成,最多等待2天
            if (!executor.awaitTermination(2, TimeUnit.DAYS)) {
                System.err.println("等待超时,部分任务未完成。");
            }
        } catch (InterruptedException e) {
            executor.shutdownNow(); // 中断等待,尝试取消正在执行的任务
            Thread.currentThread().interrupt(); // 保持中断状态
            System.err.println("主线程被中断。");
        } finally {
            System.out.println("主程序执行结束:" + LocalDateTime.now());
        }
    }
}

优化点包括:
使用LocalDateTime.now()替代new Date()以获得更易读的日期时间表示。
将类名M改为更具描述性的ScheduledTaskDemo。
注释内容更新,提供更清晰的说明。
在循环内部使用final或实际上的final变量finalDelay,以符合lambda表达式的语法要求。
在awaitTermination后增加处理逻辑,以应对等待超时或线程中断的情况,提高了代码的健壮性。
移除了注释掉的代码片段,保持代码清爽。若需要自定义ScheduledThreadPoolExecutor的行为,应显式创建并配置,而不是向下转型。

如何获取各个定时任务的返回结果呢

其实这个问题我在别的博文中已经详细介绍过了,为了让大家将之前所学的知识串起来,附上链接可以跳转
【CompletableFuture】批量异步任务处理
解决方案一:使用Future来收集结果集
如果您需要获取并打印每个定时任务的返回结果,您需要将Runnable改为Callable,以便任务能返回结果,并使用Future来接收这些结果。下面是修改后的代码示例:

package xyz.jangle.thread.test.n4_6.schedule;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class ScheduledTaskDemoWithResults {

    public static void main(String[] args) {
        // 创建单线程的ScheduledExecutorService
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

        System.out.println("主程序开始执行:" + LocalDateTime.now());

        List<Future<String>> futures = new ArrayList<>();

        // 提交五个延迟任务,这些任务现在返回一个字符串结果
        for (int delay = 1; delay <= 5; delay++) {
            int finalDelay = delay; // 使用final或effectively final变量在lambda表达式中
            Future<String> future = executor.schedule(() -> {
                System.out.println("执行任务:Task " + finalDelay + " at " + LocalDateTime.now());
                return "Result from Task " + finalDelay;
            }, finalDelay, TimeUnit.SECONDS);
            futures.add(future); // 收集Future对象
        }

        // 关闭调度器,不再接受新任务,已提交的任务将继续执行
        executor.shutdown();

        try {
            // 等待所有任务完成,并收集结果
            for (Future<String> future : futures) {
                // get()方法会阻塞直到结果可用,这里可以使用get(long timeout, TimeUnit unit)来设定超时
                String result = future.get();
                System.out.println("任务结果:" + result);
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            executor.shutdownNow(); // 中断等待,尝试取消正在执行的任务
            Thread.currentThread().interrupt(); // 保持中断状态
            System.err.println("主线程被中断或任务执行时出现异常。");
        } finally {
            System.out.println("主程序执行结束:" + LocalDateTime.now());
        }
    }
}

解决方案二:使用CompletableFuture.allOf()来收集结果集
使用 java.util.concurrent.CompletableFuture 和 CompletableFuture.allOf() 方法是一种更现代且灵活的方式来管理异步任务,尤其是当你需要等待一组任务全部完成后再继续执行后续操作时。下面是如何在你的场景中使用 CompletableFuture 和 allOf() 的示例:

package xyz.jangle.thread.test.n4_6.schedule;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class ScheduledTaskDemoWithAllOf {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 创建单线程的ScheduledExecutorService
        ExecutorService executor = Executors.newSingleThreadScheduledExecutor();

        System.out.println("主程序开始执行:" + LocalDateTime.now());

        List<CompletableFuture<String>> futures = new ArrayList<>();

        // 提交五个延迟任务,使用CompletableFuture来代表异步操作
        for (int delay = 1; delay <= 5; delay++) {
            int finalDelay = delay;
            CompletableFuture<String> future = CompletableFuture.supplyAsync(
                    () -> {
                        System.out.println("执行任务:Task " + finalDelay + " at " + LocalDateTime.now());
                        return "Result from Task " + finalDelay;
                    },
                    executor // 使用自定义的Executor
            ).orTimeout(2, TimeUnit.SECONDS); // 可选:为每个任务设置超时
            futures.add(future);
        }

        // 创建一个 CompletableFuture,它将在所有输入的CompletableFuture完成后完成
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

        // 当所有任务都完成时,打印所有结果
        allFutures.thenAccept(v -> {
            for (CompletableFuture<String> future : futures) {
                try {
                    String result = future.get(); // 这里会立即返回,因为我们已经知道所有任务都完成了
                    System.out.println("任务结果:" + result);
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("所有任务已完成:" + LocalDateTime.now());
        });

        // 关闭ExecutorService
        executor.shutdown();
        executor.awaitTermination(2, TimeUnit.DAYS); // 等待所有任务完成

        System.out.println("主程序执行结束:" + LocalDateTime.now());
    }
}

执行结果

Main:开始执行主程序Fri Aug 21 19:19:40 CST 2020
Task 0开始执行任务Fri Aug 21 19:19:41 CST 2020
Task 1开始执行任务Fri Aug 21 19:19:42 CST 2020
Task 2开始执行任务Fri Aug 21 19:19:43 CST 2020
Task 3开始执行任务Fri Aug 21 19:19:44 CST 2020
Task 4开始执行任务Fri Aug 21 19:19:45 CST 2020
Main:结束执行Fri Aug 21 19:19:45 CST 2020

总结

本篇博文的主题中心还是和大家介绍下定时任务如何使用且大家常见的用法之间的联系,各个spring底层封存的工具类有什么联系包括使用陷阱,其实解决的场景只有一个就是为了延迟处理任务,除了使用定时任务线程池之外,还有其他更好的方案【如何优雅的实现延迟消息多次提醒】方案集合

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1692574.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

根据多个坐标经纬度获取到中心点的经纬度,scala语言

文章目录 前言scala 代码 总结 前言 Scala 语言 通过多个经纬度坐标点, 计算出中心点, 这里使用的是 Scala 语言,其他的语言需要自行转换。求出来的并不是原有的点&#xff0c;而是原有点的中心位置的点。 scala 代码 package com.dw.process.midimport java.lang.Double.pa…

数据库DCL语句

数据库DCL语句 介绍&#xff1a; DCL英文全称是Data Control Language(数据控制语言)&#xff0c;用来管理数据库用户、控制数据库的访 问权限。 管理用户&#xff1a; 查询用户: select * from mysql.user;创建用户: create user 用户名主机名 identified by 密码;修改用…

CSDN 自动评论互动脚本

声明 该脚本的目的只是为了提升博客创作效率和博主互动效率,希望大家还是要尊重各位博主的劳动成果。 数据库设计 尽量我们要新建一个数据库csdn_article,再在其中建一个数据表article -- csdn_article-- article-- 需要进行自动评论的表格信息...CREATE TABLE `article`…

python低阶基础100题(上册)

** python低阶基础100题&#xff08;上册&#xff09; ** 1. 请打印出字符串 Hello World print("Hello World")2. 请打印出字符串 爸爸妈妈&#xff0c;你们辛苦啦 print("爸爸妈妈&#xff0c;你们辛苦啦")3. 请打印出字符串 人生苦短&#xff0c;我…

微信小程序uniapp+django洗脚按摩足浴城消费系统springboot

原生wxml开发对Node、预编译器、webpack支持不好&#xff0c;影响开发效率和工程构建。所以都会用uniapp框架开发 前后端分离&#xff0c;后端给接口和API文档&#xff0c;注重前端,接近原生系统 使用Navicat或者其它工具&#xff0c;在mysql中创建对应名称的数据库&#xff0…

安卓玩机搞机技巧综合资源----自己手机制作证件照的几种方法 免费制作证件照

接上篇 安卓玩机搞机技巧综合资源------如何提取手机分区 小米机型代码分享等等 【一】 安卓玩机搞机技巧综合资源------开机英文提示解决dm-verity corruption your device is corrupt. 设备内部报错 AB分区等等【二】 安卓玩机搞机技巧综合资源------EROFS分区格式 小米红…

Hive运行错误

Hive 文章目录 Hive错误日志错误SessionHiveMetaStoreClientql.Driver: FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTaskerror: Could not find or load main class org.apache.hadoop.mapreduce.v2.app.MRAppMaster Please check …

什么是创造力?如何判断自己的创造力?

创造力&#xff0c;主要表现为创新思想、发现和创造新事物的能力&#xff0c;是知识&#xff0c;智力和能力的综合能力&#xff0c;尤其是在职业发展方面&#xff0c;创造力具有重要的意义&#xff0c;企业的核心竞争力就来源于创造力&#xff0c;这就需要具有创造力的员工来推…

阿尔杰姆·卢金采访

近年来&#xff0c;在中俄两国元首亲自擘画、战略引领下&#xff0c;两国新时代全面战略协作伙伴关系成熟坚韧、稳如泰山&#xff0c;树立了新型大国关系的新范式。中俄关系走出了一条大国战略互信、邻里友好的相处之道&#xff0c;给两国人民带来了实实在在的好处&#xff0c;…

使用Python将Word文档中的图片提取并生成PowerPoint幻灯片

在这篇博客中&#xff0c;我们将学习如何使用Python将Word文档中的图片提取出来并生成一个PowerPoint幻灯片。我们将借助wxPython、python-docx和python-pptx这三个强大的库来实现这一目标。以下是实现这个功能的完整过程。 C:\pythoncode\new\wordTOppt.py 所需库 首先&…

8.继承和多态

继承 为什么需要继承 Java中使用类对现实世界中实体来进行描述&#xff0c;类经过实例化之后的产物对象&#xff0c;则可以用来表示现实中的实体&#xff0c;但是现实世界错综复杂&#xff0c;事物之间可能会存在一些关联 比如&#xff1a;狗和猫 它们都是一个动物 代码举例…

文件系统--软硬链接

文章目录 现象软链接硬链接 现象 建立软链接 建立硬链接 // 删除软硬链接都可以用 unlink 指令 unlink soft-link软链接 软链接是一个独立的文件&#xff0c;因为有独立的inode number 软链接的内容&#xff1a;目标文件所对应的路劲字符串如果我们直接查看软链接文件&#…

在matlab里面计算一组给定参数的方程的解

如&#xff1a; k (1:1024); f (x)(1-x-k.*x.^2); 在这段代码给出了一组函数&#xff0c;若需要计算f0&#xff0c;可以通过自带的函数实现&#xff1a; x0 zeros(length(k),1); options optimoptions(fsolve,Display,none,TolX,tol,TolFun,tol); tic for ik 1:length…

BERT ner 微调参数的选择

针对批大小和学习率的组合进行收敛速度测试&#xff0c;结论&#xff1a; 相同轮数的条件下&#xff0c;batchsize-32 相比 batchsize-256 的迭代步数越多&#xff0c;收敛更快批越大的话&#xff0c;学习率可以相对设得大一点 画图代码&#xff08;deepseek生成&#xff09;…

aws glue配置读取本地kafka数据源

创建连接时填写本地私有ip地址&#xff0c;选择网络配置 配置任务选择kafka作为数据源 但是执行任务时日志显示连接失败 文档提到只能用加密通信 如果您希望与 Kafka 数据源建立安全连接&#xff0c;请选择 Require SSL connection (需要 SSL 连接)&#xff0c;并在 Kafka priv…

Nginx - 集成ModSecurity实现WAF功能

文章目录 Pre概述什么是ModSecurity&#xff1f;ModSecurity的工作原理主要功能和特点使用场景与其他安全解决方案的比较 ModSecurity在Nginx中的应用安装ModSecurity配置ModSecurity Pre Nginx - 集成Waf 功能 概述 ModSecurity是一款开源的Web应用防火墙&#xff08;WAF&a…

Java反射角度理解spring

概述 Java反射&#xff08;Reflection&#xff09;是Java编程语言的一个特性&#xff0c;它允许在运行时对类、接口、字段和方法进行动态查询和操作。反射提供了一种在运行时查看和修改程序行为的能力&#xff0c;这通常用于实现一些高级功能&#xff0c;如框架(Spring)、ORM&…

考研计组chap1计算机系统概述

目录 一、计算机发展历程(不考了) 二、计算机硬件的基本组成 3 1.五个部分 &#xff08;1&#xff09;输入设备 &#xff08;2&#xff09;控制器 &#xff08;3&#xff09;运算器 &#xff08;4&#xff09;&#xff08;主&#xff09;存储器 &#xff08;5&#xff0…

探索Python技巧:零基础学习缩进与逻辑关系

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、理解Python的缩进语法 缩进规则详解 二、缩进在逻辑关系中的应用 逻辑块示例 三、实…

Leecode热题100---55:跳跃游戏(贪心算法)

题目&#xff1a; 给你一个非负整数数组 nums &#xff0c;你最初位于数组的 第一个下标 。数组中的每个元素代表你在该位置可以跳跃的最大长度。 判断你是否能够到达最后一个下标&#xff0c;如果可以&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 贪心算…