在日常的项目开发中,多多少少都会涉及到一些定时任务的需求。例如每分钟扫描超时支付的订单,每小时清理一次数据库历史数据,每天统计前一天的数据并生成报表,定时去扫描某个表的异常信息(最终一致性的方案也可能涉及),定时启用某个业务开关等等。下面对一些Java中常用的定时任务做一些简单的介绍。
1.Java自带解决方式
Java 中自带的解决方案Timer。
1.Timer
使用 Timer创建 java.util.TimerTask 任务,在 run 方法中实现业务逻辑。通过 java.util.Timer 进行调度,支持按照固定频率执行。所有的 TimerTask 是在同一个线程中串行执行,相互影响。也就是说,对于同一个 Timer 里的多个 TimerTask 任务,如果一个 TimerTask 任务在执行中,其它 TimerTask 即使到达执行的时间,也只能排队等待。如果有异常产生,线程将退出,整个定时任务就失败。在编码的时候,因为是单线程阻塞式的行为,编写代码的时候,阿里巴巴插件会提示采用ScheduledExecutorService来代替Timer。
注:在分布式锁的redission的实现中用到watchdog就是基于这种方式
import java.util.Timer;
import java.util.TimerTask;
public class TestTimerTask {
public static void main(String[] args) {
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
System.out.println("hell world");
}
};
Timer timer = new Timer();
timer.schedule(timerTask, 10, 3000);
}
}
2.ScheduledExecutorService
基于线程池设计的定时任务解决方案,每个调度任务都会分配到线程池中的一个线程去执行,解决 Timer 定时器无法并发执行的问题,支持 fixedRate 和 fixedDelay。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TestTimerTask {
public static void main(String[] args) {
ScheduledExecutorService ses = Executors.newScheduledThreadPool(5);
//按照固定频率执行,每隔5秒跑一次
ses.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("hello fixedRate");
}
}, 0, 5, TimeUnit.SECONDS);
//按照固定延时执行,上次执行完后隔3秒再跑
ses.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println("hello fixedDelay");
}
}, 0, 3, TimeUnit.SECONDS);
}
}
2.Spring 中自带的解决方案
这个应该就是大多数项目会采用的方式了,也是比较主流的方式。Springboot 中提供了一套轻量级的定时任务工具 Spring Task,通过注解可以很方便的配置,支持 cron 表达式、fixedRate、fixedDelay。
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
@EnableScheduling
public class MyTask {
/**
* 每分钟的第30秒跑一次
*/
@Scheduled(cron = "30 * * * * ?")
public void task1() throws InterruptedException {
System.out.println("hello cron");
}
/**
* 每隔5秒跑一次
*/
@Scheduled(fixedRate = 5000)
public void task2() throws InterruptedException {
System.out.println("hello fixedRate");
}
/**
* 上次跑完隔3秒再跑
*/
@Scheduled(fixedDelay = 3000)
public void task3() throws InterruptedException {
System.out.println("hello fixedDelay");
}
}
Spring Task 相对于上面提到的两种解决方案,最大的优势就是支持 cron 表达式,可以处理按照标准时间固定周期执行的业务,比如每天几点几分执行。这个cron表达式也不用刻意的去记住,只需要大概理解,然后找一个cron生成的网站就行。
同样这个Scheduled的方式默认也是单线程的,如果想采用多线程,可以自定义线程池来结合使用,也可以用@Async的方式指定线程池来使用。这里就不多做介绍了
3.业务幂等解决方案
现在的应用基本都是分布式部署,所有机器的代码都是一样的,前面介绍的 Java 和 Spring 自带的解决方案,都是进程级别的,每台机器在同一时间点都会执行定时任务。这样会导致需要业务幂等的定时任务业务有问题,比如每月定时给用户推送消息,就会推送多次。
于是,很多应用很自然的就想到了使用分布式锁的解决方案。即每次定时任务执行之前,先去抢锁,抢到锁的执行任务,抢不到锁的不执行。怎么抢锁,又是五花八门,比如使用 DB、zookeeper、redis。
1.使用 DB 或者 Zookeeper 抢锁
使用 DB 或者 Zookeeper 抢锁的架构差不多,原理如下:
- 定时时间到了,在回调方法里,先去抢锁。
- 抢到锁,则继续执行方法,没抢到锁直接返回。
- 执行完方法后,释放锁
示例代码如下:
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
@EnableScheduling
public class MyTask {
/**
* 每分钟的第30秒跑一次
*/
@Scheduled(cron = "30 * * * * ?")
public void task1() throws Exception {
String lockName = "task1";
if (tryLock(lockName)) {
System.out.println("hello cron");
releaseLock(lockName);
} else {
return;
}
}
private boolean tryLock(String lockName) {
//TODO
return true;
}
private void releaseLock(String lockName) {
//TODO
}
}
当前的这个设计,仔细一点的同学可以发现,其实还是有可能导致任务重复执行的。比如任务执行的非常快,A 这台机器抢到锁,执行完任务后很快就释放锁了。B 这台机器后抢锁,还是会抢到锁,再执行一遍任务。因为数据库的io也是需要时间,在极限或者并发高的情况下,就会出现等等情况。所以由此衍生出来Redis
2.使用 redis 抢锁
使用 redis 抢锁,其实架构上和 DB/zookeeper 差不多,不过 redis 抢锁支持过期时间,不用主动去释放锁,并且可以充分利用这个过期时间,解决任务执行过快释放锁导致任务重复执行的问题,架构如下:
示例代码如下:
@Component
@EnableScheduling
public class MyTask {
/**
* 每分钟的第30秒跑一次
*/
@Scheduled(cron = "30 * * * * ?")
public void task1() throws InterruptedException {
String lockName = "task1";
if (tryLock(lockName, 30)) {
System.out.println("hello cron");
releaseLock(lockName);
} else {
return;
}
}
private boolean tryLock(String lockName, long expiredTime) {
//TODO
return true;
}
private void releaseLock(String lockName) {
//TODO
}
}
同样如果基于这个的话,可以直接用开源的redission。提供了包括分布式锁,限流,自动续约时间,自动删除锁避免死锁等。
4.使用 Quartz
Quartz 是一套轻量级的任务调度框架,只需要定义了 Job(任务),Trigger(触发器)和 Scheduler(调度器),即可实现一个定时调度能力。支持基于数据库的集群模式,可以做到任务幂等执行。(其实关联的东西很多,而且对数据库要新建很多业务要求的表,如果业务不是很复杂用spring的就足够了)
Quartz 支持任务幂等执行,其实理论上还是抢 DB 锁,我们看下 quartz 的表结构:
其中,QRTZ_LOCKS 就是 Quartz 集群实现同步机制的行锁表,其表结构如下
--QRTZ_LOCKS表结构
CREATE TABLE `QRTZ_LOCKS` (
`LOCK_NAME` varchar(40) NOT NULL,
PRIMARY KEY (`LOCK_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
--QRTZ_LOCKS记录
+-----------------+
| LOCK_NAME |
+-----------------+
| CALENDAR_ACCESS |
| JOB_ACCESS |
| MISFIRE_ACCESS |
| STATE_ACCESS |
| TRIGGER_ACCESS |
+-----------------+
可以看出 QRTZ_LOCKS 中有 5 条记录,代表 5 把锁,分别用于实现多个 Quartz Node 对 Job、Trigger、Calendar 访问的同步控制。
5.开源任务调度中间件
上面提到的解决方案,在架构上都有一个问题,那就是每次调度都需要抢锁,特别是使用 DB 和 Zookeeper 抢锁,性能会比较差,一旦任务量增加到一定的量,就会有比较明显的调度延时。还有一个痛点,就是业务想要修改调度配置,或者增加一个任务,得修改代码重新发布应用。
于是开源社区涌现了一堆任务调度中间件,通过任务调度系统进行任务的创建、修改和调度,这其中国内最火的就是 XXL-JOB 和 ElasticJob。
1.ElasticJob
ElasticJob 是一款基于 Quartz 开发,依赖 Zookeeper 作为注册中心、轻量级、无中心化的分布式任务调度框架,目前已经通过 Apache 开源。
ElasticJob 相对于 Quartz 来说,从功能上最大的区别就是支持分片,可以将一个任务分片参数分发给不同的机器执行。架构上最大的区别就是使用 Zookeeper 作为注册中心,不同的任务分配给不同的节点调度,不需要抢锁触发,性能上比 Quartz 上强大很多,架构图如下:
开发上也比较简单,和 springboot 结合比较好,可以在配置文件定义任务如下:
elasticjob:
regCenter:
serverLists: localhost:2181
namespace: elasticjob-lite-springboot
jobs:
simpleJob:
elasticJobClass: org.apache.shardingsphere.elasticjob.lite.example.job.SpringBootSimpleJob
cron: 0/5 * * * * ?
timeZone: GMT+08:00
shardingTotalCount: 3
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
scriptJob:
elasticJobType: SCRIPT
cron: 0/10 * * * * ?
shardingTotalCount: 3
props:
script.command.line: "echo SCRIPT Job: "
manualScriptJob:
elasticJobType: SCRIPT
jobBootstrapBeanName: manualScriptJobBean
shardingTotalCount: 9
props:
script.command.line: "echo Manual SCRIPT Job: "
实现任务接口如下:
@Component
public class SpringBootShardingJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
System.out.println("分片总数="+context.getShardingTotalCount() + ", 分片号="+context.getShardingItem()
+ ", 分片参数="+context.getShardingParameter());
}
}
同时,ElasticJob 还提供了一个简单的 UI,可以查看任务的列表,同时支持修改、触发、停止、生效、失效操作。
ElasticJob 暂不支持动态创建任务。
2.XXL-JOB
XXL-JOB 是一个开箱即用的轻量级分布式任务调度系统,其核心设计目标是开发迅速、学习简单、轻量级、易扩展,在开源社区广泛流行。
XXL-JOB 是 Master-Slave 架构,Master 负责任务的调度,Slave 负责任务的执行,架构图如下:
XXL-JOB 接入也很方便,不同于 ElasticJob 定义任务实现类,是通过@XxlJob 注解定义 JobHandler。(安装和集成springboot可自行百度,需要SQL数据库来存储一些相关的表,通杀也提供了动态的数据UI展示)
实例代码:
@Component
public class SampleXxlJob {
private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);
/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public ReturnT<String> demoJobHandler(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {
XxlJobLogger.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
return ReturnT.SUCCESS;
}
/**
* 2、分片广播任务
*/
@XxlJob("shardingJobHandler")
public ReturnT<String> shardingJobHandler(String param) throws Exception {
// 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
// 业务逻辑
for (int i = 0; i < shardingVO.getTotal(); i++) {
if (i == shardingVO.getIndex()) {
XxlJobLogger.log("第 {} 片, 命中分片开始处理", i);
} else {
XxlJobLogger.log("第 {} 片, 忽略", i);
}
}
return ReturnT.SUCCESS;
}
}
XXL-JOB 相较于 ElasticJob,最大的特点就是功能比较丰富,可运维能力比较强,不但支持控制台动态创建任务,还有调度日志、运行报表等功能。(强力推荐)
还有一些企业级别的组件,例如阿里云任务调度 SchedulerX。这个如果有需要就请自行了解了。
感谢!!!