SpringBoot教程(二十一) | SpringBoot实现定时任务
- 单点定时任务
- 方式一:使用@Scheduled+@EnableScheduling注解
- 巨坑(@Scheduled任务都用了同一个线程去执行,导致定时任务存在堵塞)
- 解决办法一:添加自定义的ThreadPoolTaskScheduler配置(为调度配置多个线程)
- 解决办法二(建议用这个):使用异步包裹定时任务
- 分布式定时任务
- 方式一:使用elastic-job
- 1、引入相关依赖
- 2、application.yml中配置注册中心和作业调度
- 3、job实例
- ElasticJob-UI监控平台
参考文章:
【1】IDEA SpringBoot实现定时任务(保姆级教程,超详细!!!)
【2】SpringBoot整合分布式任务调度Elastic-Job
单点定时任务
方式一:使用@Scheduled+@EnableScheduling注解
在Spring Boot项目中使用@Scheduled注解实现定时任务时,你通常不需要额外导入特定的依赖,
因为@Scheduled是Spring框架的核心功能之一,并且Spring Boot会自动配置与调度相关的组件。
@EnableScheduling:Spring框架提供的一个注解,用于启用基于注解的定时任务调度功能。当在Spring的配置类(如使用@Configuration注解的类)上使用@EnableScheduling注解时,Spring会自动配置一个任务调度器(TaskScheduler),负责管理所有带有@Scheduled注解的方法。
@Scheduled:Spring框架中用于定时任务调度的注解。它允许开发者将一个方法标记为定时任务,并配置任务的执行时间间隔或Cron表达式,从而实现在指定时间或按照指定周期自动执行该方法的功能。 除了配置Cron表达式外,还可以通过fixedRate和fixedDelay两种方式设置定时任务,这两种方式可以自行了解。
@Slf4j
@Component
@EnableScheduling
public class DemoTask {
// 每5秒执行一次
@Scheduled(cron = "0/5 * * * * ? ")
public void testSchedule1() {
log.info("第 1 个定时任务"+"当前执行任务的线程号ID===>{}", Thread.currentThread().getId()); // 日志输出
}
// 每10秒执行一次
@Scheduled(cron = "0/10 * * * * ? ")
public void testSchedule2() {
log.info("第 2 个定时任务"+"当前执行任务的线程号ID===>{}", Thread.currentThread().getId()); // 日志输出
}
}
项目启动后
是成功且正常执行的
当我把阻塞sleep加进去以后
@Slf4j
@Component
@EnableScheduling
public class DemoTask {
// 每10秒执行一次
@Scheduled(cron = "0/10 * * * * ? ")
public void testSchedule1() {
Thread.sleep(10000);//休眠10秒
log.info("第 1 个定时任务"+"当前执行任务的线程号ID===>{}", Thread.currentThread().getId()); // 日志输出
}
// 每10秒执行一次
@Scheduled(cron = "0/10 * * * * ? ")
public void testSchedule2() {
log.info("第 2 个定时任务"+"当前执行任务的线程号ID===>{}", Thread.currentThread().getId()); // 日志输出
}
}
会发现怎么定时任务2变成间隔20秒执行一次了
是因为会执行这个定时任务使用的线程号ID都是同一个,任务1堵塞了10秒导致影响了后面任务2的执行(说明都用了同一个线程去执行定时任务的,简直巨坑!!!)。
巨坑(@Scheduled任务都用了同一个线程去执行,导致定时任务存在堵塞)
解决办法一:添加自定义的ThreadPoolTaskScheduler配置(为调度配置多个线程)
默认情况下,Spring会尝试在Spring应用上下文中查找一个名为taskScheduler的bean,这个bean必须是TaskScheduler接口的实现。
一旦找到了这个bean,Spring就会使用它来调度所有@Scheduled注解标记的方法。
所以Bean的name不能随便乱写
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
public class SchedulerConfig {
//这个bean名字不要乱改,否则会不生效
@Bean(name = "taskScheduler")
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
// 设置线程池的核心大小
scheduler.setPoolSize(10);
// 其他设置,如线程名称前缀等(可选)
scheduler.setThreadNamePrefix("wocao-task-");
// 初始化调度器
scheduler.initialize();
return scheduler;
}
}
效果如下:
定时任务1不会堵塞定时任务2了,且 定时任务 都是10秒钟执行一次,不会存在堵塞延迟
解决办法二(建议用这个):使用异步包裹定时任务
(1)首先配置自定义线程池
@Configuration
public class DemoTheadPoolConfig {
@Bean(name = "taskExecutor")
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//设置核心线程数
executor.setCorePoolSize(10);
//设置最大线程数
executor.setMaxPoolSize(20);
//缓冲队列200:用来缓冲执行任务的队列
executor.setQueueCapacity(200);
//线程活路时间 60 秒
executor.setKeepAliveSeconds(60);
//线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix("demo-thread-");
//设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
(2) 执行异步操作
方法一:使用工具类CompletableFuture.runAsync
CompletableFuture.runAsync 是 Java 8 引入的一个非常有用的工具,用于异步执行任务
runAsync
方法用于启动一个异步任务,这个任务不返回结果(即返回类型为void
)。
基本用法
runAsync
方法有两种形式:
-
无参数版本:使用系统默认的
ForkJoinPool
来异步执行任务。CompletableFuture.runAsync(() -> { // 这里是异步执行的任务 System.out.println("异步任务执行中:" + Thread.currentThread().getName()); });
在这个例子中,
runAsync
接收一个Runnable
函数式接口的实现(即一个不接受参数且不返回结果的run
方法),并在一个默认的线程池中异步执行这个任务。 -
带 Executor 参数版本:允许你指定一个自定义的
Executor
来执行异步任务。Executor executor = Executors.newFixedThreadPool(5); // 创建一个固定大小的线程池 CompletableFuture.runAsync(() -> { // 这里是异步执行的任务 System.out.println("异步任务执行中,使用自定义线程池:" + Thread.currentThread().getName()); }, executor);
在这个例子中,
runAsync
除了接收一个Runnable
任务外,还接收一个Executor
参数,允许你控制异步任务的执行线程。
@Slf4j
@Component
@EnableScheduling
public class DemoTask {
@Autowired
@Qualifier("taskExecutor") // 确保使用我们自定义的线程池
private TaskExecutor taskExecutor; // 注入 TaskExecutor
// 每10秒执行一次
@Scheduled(cron = "0/10 * * * * ? ")
public void testSchedule1() {
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(10000); // 休眠10秒,模拟业务场景执行时间
log.info("第 1 个定时任务"+"当前执行任务的线程号ID===>{}", Thread.currentThread().getId()); // 日志输出
} catch (Exception e) {
e.printStackTrace();
}
}, taskExecutor);
}
// 每10秒执行一次
@Scheduled(cron = "0/10 * * * * ? ")
public void testSchedule2() {
CompletableFuture.runAsync(() -> {
try {
log.info("第 2 个定时任务"+"当前执行任务的线程号ID===>{}", Thread.currentThread().getId()); // 日志输出
} catch (Exception e) {
e.printStackTrace();
}
}, taskExecutor);
}
}
效果如下
定时任务1不会堵塞定时任务2了,且 定时任务 都是10秒钟执行一次,不会存在堵塞延迟
方法二:使用@Async+@EnableAsync
@EnableAsync:开启异步任务
@Async:给希望异步执行的方法标注
一般使用@Async都会指定自定义的线程池
在此处的例子应该写成这样@Async(value = “TaskExecutor”)
@Slf4j
@Component
@EnableAsync
@EnableScheduling
public class DemoTask {
// 每10秒执行一次
@Async("taskExecutor")//指定自定义的线程池
@Scheduled(cron = "0/10 * * * * ? ")
public void testSchedule1() {
try {
Thread.sleep(10000); // 休眠10秒,模拟业务场景执行时间
log.info("第 1 个定时任务"+"当前执行任务的线程号ID===>{}", Thread.currentThread().getId()); // 日志输出
} catch (Exception e) {
e.printStackTrace();
}
}
// 每10秒执行一次
@Async("taskExecutor")//指定自定义的线程池
@Scheduled(cron = "0/10 * * * * ? ")
public void testSchedule2() {
try {
log.info("第 2 个定时任务"+"当前执行任务的线程号ID===>{}", Thread.currentThread().getId()); // 日志输出
} catch (Exception e) {
e.printStackTrace();
}
}
}
效果如下
定时任务1不会堵塞定时任务2了,且 定时任务 都是10秒钟执行一次,不会存在堵塞延迟
分布式定时任务
方式一:使用elastic-job
elastic-job是当当网基于quartz 二次开发的弹性分布式任务调度系统,功能丰富强大,采用zookeeper实现分布式协调,实现任务高可用以及分片。
Elastic-Job是一个分布式调度的解决方案,由当当网开源,它由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成,使用Elastic-Job可以快速实现分布式任务调度。
zk在elastic-job的作用
Elastic-Job依赖ZooKeeper完成对执行任务信息的存储(如任务名称、任务参与实例、任务执行策略等);
Elastic-Job依赖ZooKeeper实现选举机制,在任务执行实例数量变化时(如在快速上手中的启动新实例或停止实例),会触发选举机制来决定让哪个实例去执行该任务。
我这边主要讲SpringBoot,所以肯定会采用场景启动器starter的
什么是场景启动器?
也就是spring-boot-starter- 开头的依赖,
以下是一些常见的Spring Boot场景启动器示例:
(1)spring-boot-starter-data-jpa:包含Spring Data JPA和Hibernate的依赖,用于简化数据库访问和JPA(Java Persistence API)的使用。
(2)spring-boot-starter-data-mongodb:包含Spring Data MongoDB的依赖,用于简化MongoDB数据库的访问。
(3)spring-boot-starter-security:包含Spring Security的依赖,用于添加安全功能,如用户认证和授权。
(4)… … … …使用Spring Boot的场景启动器确实可以让您避免手动添加所需的依赖,因为它已经为您预先定义并包含了这些依赖。这样,您就可以更专注于业务逻辑的实现,而不是花费大量时间在依赖管理和配置上了。
1、引入相关依赖
<!-- 引入版本的时候要充分考虑自己的Zookeeper Server版本, 建议与其保持一致 -->
<!-- 例如 版本使用的是 zk 3.6.x, Zookeeper Server 也是3.6.x, 如果使用3.4.x则会报错 -->
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-spring-boot-starter</artifactId>
<version>3.0.2</version>
</dependency>
2、application.yml中配置注册中心和作业调度
server:
port: 9999
#elasticjob配置
elasticjob:
# 注册中心配置
reg-center:
# 连接 ZooKeeper 服务器的列表, 包括 IP 地址和端口号,多个地址用逗号分隔
server-lists: 127.0.0.1:2188
# ZooKeeper 的命名空间
namespace: elastic-job-spring
# 等待重试的间隔时间的初始毫秒数
base-sleep-time-milliseconds: 1000
# 等待重试的间隔时间的最大毫秒数
maxSleepTimeMilliseconds: 3000
# 最大重试次数
maxRetries: 3
# 会话超时毫秒数
sessionTimeoutMilliseconds: 60000
# 连接超时毫秒数
connectionTimeoutMilliseconds: 15000
# 作业配置, 更多的配置参考官网
jobs:
springJob: # job名
elasticJobClass: com.harvey.demo.job.SpringBootJob
cron: 0/5 * * * * ?
shardingTotalCount: 2
shardingItemParameters: 0=Beijing,1=Shanghai
springTestJob:
elasticJobClass: com.harvey.demo.job.SpringRunnerJob
cron: 0/10 * * * * ?
shardingTotalCount: 3
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
3、job实例
第一个job:
@Component
public class SpringBootJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println(this.getClass() + ":分片总数是: [{" + shardingContext.getShardingTotalCount() + "}]; 当前分片项是: [{" + shardingContext.getShardingItem() + "}]");
//分片参数,(0=text,1=image,2=video,参数就是text、image...)
String shardingParameter = shardingContext.getShardingParameter();
//任务参数, 配置是name=test就是name=test
String jobParameter = shardingContext.getJobParameter();
System.out.println("current sharding item " + shardingContext.getShardingItem() + ", shardingParameter = " + shardingParameter + ", jobParameter:" + jobParameter);
}
}
第二个job:
@Component
public class SpringRunnerJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println(this.getClass() + ":分片总数是: [{" + shardingContext.getShardingTotalCount() + "}]; 当前分片项是: [{" + shardingContext.getShardingItem() + "}]");
//分片参数,(0=text,1=image,2=video,参数就是text、image...)
String shardingParameter = shardingContext.getShardingParameter();
//任务参数
String jobParameter = shardingContext.getJobParameter();
System.out.println("current sharding item " + shardingContext.getShardingItem() + ", shardingParameter = " + shardingParameter + ", jobParameter:" + jobParameter);
}
}
启动项目,可以看到相关的日志。
ElasticJob-UI监控平台
适用于【场景启动器starter】方式开发的elastic-job。
下载ElasticJob-UI
https://shardingsphere.apache.org/elasticjob/current/cn/downloads/
解压后在bin目录双击启动
(本人是在windows环境)
访问控制台
启动后游览器访问(默认端口是8088):http://127.0.0.1:8088/#/login 用户名/密码 root/root
登录成功后,链接上注册中心,链接成功后便可以进行任务的管理。