xxl-job
虽然java自带定时器,但是在springcloud内,如果对多个模块进行统一任务调度,这是自带的定时器就显得不够用,这时就可以使用xxl-job。
xxl-job是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
特点
1、简单:支持通过 Web 页面对任务进行 CRUD 操作,操作简单,一分钟上手;
2、动态:支持动态修改任务状态、启动 / 停止任务,以及终止运行中任务,即时生效;
3、调度中心 HA(中心式):调度采用中心式设计,“调度中心” 自研调度组件并支持集群部署,可保证调度中心 HA;
4、执行器 HA(分布式):任务分布式执行,任务 “执行器” 支持集群部署,可保证任务执行 HA;
5、注册中心:执行器会周期性自动注册任务,调度中心将会自动发现注册的任务并触发执行。同时,也支持手动录入执行器地址;
6、弹性扩容缩容:一旦有新执行器机器上线或者下线,下次调度时将会重新分配任务;
7、路由策略:执行器集群部署时提供丰富的路由策略,包括:第一个、最后一个、轮询、随机、一致性 HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等;
8、故障转移:任务路由策略选择 “故障转移” 情况下,如果执行器集群中某一台机器故障,将会自动 Failover 切换到一台正常的执行器发送调度请求。
9、阻塞处理策略:调度过于密集执行器来不及处理时的处理策略,策略包括:单机串行(默认)、丢弃后续调度、覆盖之前调度;
10、任务超时控制:支持自定义任务超时时间,任务运行超时将会主动中断任务;
11、任务失败重试:支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;其中分片任务支持分片粒度的失败重试;
12、任务失败告警;默认提供邮件方式失败告警,同时预留扩展接口,可方便的扩展短信、钉钉等告警方式;
13、分片广播任务:执行器集群部署时,任务路由策略选择 “分片广播” 情况下,一次任务调度将会广播触发集群中所有执行器执行一次任务,可根据分片参数开发分片任务;
14、动态分片:分片广播任务以执行器为维度进行分片,支持动态扩容执行器集群从而动态增加分片数量,协同进行业务处理;在进行大数据量业务操作时可显著提升任务处理能力和速度。
15、事件触发:除了 “Cron 方式” 和 “任务依赖方式” 触发任务执行之外,支持基于事件的触发任务方式。调度中心提供触发任务单次执行的 API 服务,可根据业务事件灵活触发。
16、任务进度监控:支持实时监控任务进度;
17、Rolling 实时日志:支持在线查看调度结果,并且支持以 Rolling 方式实时查看执行器输出的完整的执行日志;
18、GLUE:提供 Web IDE,支持在线开发任务逻辑代码,动态发布,实时编译生效,省略部署上线的过程。支持 30 个版本的历史版本回溯。
19、脚本任务:支持以 GLUE 模式开发和运行脚本任务,包括 Shell、Python、NodeJS、PHP、PowerShell 等类型脚本;
20、命令行任务:原生提供通用命令行任务 Handler(Bean 任务,“CommandJobHandler”);业务方只需要提供命令行即可;
21、任务依赖:支持配置子任务依赖,当父任务执行结束且执行成功后将会主动触发一次子任务的执行,多个子任务用逗号分隔;
22、一致性:“调度中心” 通过 DB 锁保证集群分布式调度的一致性,一次任务调度只会触发一次执行;
23、自定义任务参数:支持在线配置调度任务入参,即时生效;
24、调度线程池:调度系统多线程触发调度运行,确保调度精确执行,不被堵塞;
25、数据加密:调度中心和执行器之间的通讯进行数据加密,提升调度信息安全性;
26、邮件报警:任务失败时支持邮件报警,支持配置多邮件地址群发报警邮件;
27、推送 maven 中央仓库:将会把最新稳定版推送到 maven 中央仓库,方便用户接入和使用;
28、运行报表:支持实时查看运行数据,如任务数量、调度次数、执行器数量等;以及调度报表,如调度日期分布图,调度成功分布图等;
29、全异步:任务调度流程全异步化设计实现,如异步调度、异步运行、异步回调等,有效对密集调度进行流量削峰,理论上支持任意时长任务的运行;
30、跨语言:调度中心与执行器提供语言无关的 RESTful API 服务,第三方任意语言可据此对接调度中心或者实现执行器。除此之外,还提供了 “多任务模式” 和 “httpJobHandler” 等其他跨语言方案;
31、国际化:调度中心支持国际化设置,提供中文、英文两种可选语言,默认为中文;
32、容器化:提供官方 docker 镜像,并实时更新推送 dockerhub,进一步实现产品开箱即用;
33、线程池隔离:调度线程池进行隔离拆分,慢任务自动降级进入 “Slow” 线程池,避免耗尽调度线程,提高系统稳定性;
34、用户管理:支持在线管理系统用户,存在管理员、普通用户两种角色;
35、权限控制:执行器维度进行权限控制,管理员拥有全量权限,普通用户需要分配执行器权限后才允许相关操作;
使用
到官网或者码云拿到xxl-job-admin模块代码,在对应数据库创建数据库表,运行项目中的doc中db下的数据库文件tables_xxl_job.sql
admin模块就是对应xxl-job的业务表的增删改查。
core对应xxl-job的业务执行逻辑。
调度中心访问地址:http://localhost:8080/xxl-job-admin (该地址执行器将会使用到,作为回调地址),默认登录账号 “admin/123456”。
自定义添加执行器
用调度中心的网页添加执行器、定时任务是非常简单的,这里不做多解释,傻瓜式操作罢了。
如果是项目内使用呢,还可以使用网页创建吗,这显然不行。接下来简绍一下自定义。
自定义模块:
自定义模块配置:需要根据自己部署的admin模块地址来修改
重写增删改查:
package com.cetc.xxljob.controller;
import com.cetc.common.core.web.domain.ResponseResult;
import com.cetc.xxljob.api.model.rq.XxlJobInfoRq;
import com.cetc.xxljob.domain.XxlJobInfo;
import com.cetc.xxljob.service.XxlJobService;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
@Slf4j
@RestController
@RequestMapping("/job")
public class XxlJobController {
@Autowired
XxlJobService jobService;
@ApiOperation("新增任务")
@PostMapping("/addJob")
public ResponseResult addJobInfo(@RequestBody @Validated XxlJobInfoRq rq) {
return jobService.addJobInfo(rq);
}
@ApiOperation("更新任务")
@PostMapping("/updateJob")
public ResponseResult updateJobCron(@RequestBody @Validated XxlJobInfoRq rq) {
return jobService.update(rq);
}
@ApiOperation("删除任务")
@PostMapping("/removeJob/{id}")
public ResponseResult removeJob(@PathVariable int id) {
return jobService.removeJob(id);
}
@ApiOperation("启动任务")
@PostMapping("/startJob/{id}")
public ResponseResult startJob(@PathVariable int id) {
return jobService.startJob(id);
}
@ApiOperation("暂停任务")
@PostMapping("/pauseJob/{id}")
public ResponseResult pauseJob(@PathVariable int id) {
return jobService.pauseJob(id);
}
@ApiOperation("新增并启动任务")
@PostMapping("/addAndStart")
public ResponseResult addAndStart(@RequestBody @Validated XxlJobInfoRq rq) {
return jobService.addAndStart(rq);
}
@ApiOperation("获取任务详情")
@PostMapping("/{id}")
public ResponseResult<XxlJobInfo> getJoInfo(@PathVariable int id) {
return jobService.getJobInfo(id);
}
@ApiOperation("验证cron")
@GetMapping("/checkCron")
public ResponseResult checkCron(String cron) {
return jobService.checkCronExpressionIsValid(cron);
}
@ApiOperation("获取cron的下一次执行时间")
@GetMapping("/nextTriggerTime")
public ResponseResult nextTriggerTime(String scheduleType, String scheduleConf) {
return jobService.nextTriggerTime(scheduleType,scheduleConf);
}
}
调用接口:
这里就会到对应模块的/start/{id}去调用对应服务。
业务模块:
到此,一次定时任务调度完毕。
至于怎样创建定时任务,我这里给个参考代码。
@Override
public ResponseResult createCallTask(OutboundDilapidatedReservoirs reservoirs) {
//创建定时任务
Calendar calendar = Calendar.getInstance();
calendar.setTime(reservoirs.getCallDate());
int year = calendar.get(Calendar.YEAR);//获取年份
int month = calendar.get(Calendar.MONTH) + 1;//获取月份
int day = calendar.get(Calendar.DATE);//日
int hours = calendar.get(Calendar.HOUR_OF_DAY);//小时
int min = calendar.get(Calendar.MINUTE);//分钟
int seconds = calendar.get(Calendar.SECOND);//秒
String cron = seconds + " " + min + " " + hours + " " + day + " " + month + " ? " + year;
ResponseResult result = xxlJobService.nextTriggerTime(XxlJobContants.scheduleType.CRON.getValue(),cron);
if (R.FAIL == result.getCode()) {
throw new BaseException(result.getMsg());
}
JSONArray array = JSON.parseArray(JSON.toJSONString(result.getData()));
if (array.size()==0 ||!DateUtils.getNowDate().before(DateUtils.parseDate(array.get(0)))){
throw new BaseException("任务时间已过期,请修改");
}
XxlJobInfoRq job = new XxlJobInfoRq();
job.setJobDesc("定时任务,ID:" + reservoirs.getId());
job.setAuthor(SecurityUtils.getUsername());
job.setScheduleType(XxlJobContants.scheduleType.CRON.getValue());
job.setScheduleConf(cron);
job.setMisfireStrategy(XxlJobContants.misfireStrategy.DO_NOTHING.getValue());
job.setExecutorRouteStrategy(XxlJobContants.executorRouteStrategy.FAILOVER.getValue());
job.setExecutorHandler(OutboundConstants.TASK_TYPE_DILAPIDATED_RESERVOIRS);
job.setExecutorBlockStrategy(XxlJobContants.executorBlockStrategy.SERIAL_EXECUTION.getValue());
job.setExecutorTimeout(0);
job.setExecutorParam(reservoirs.getId().toString());
job.setExecutorFailRetryCount(0);
job.setGlueType(XxlJobContants.glueType.BEAN.getValue());
job.setExecutorParam(reservoirs.getId().toString());
//新增并启动定时任务
ResponseResult jobResult = xxlJobService.addAndStartApi(job);
if (R.FAIL == jobResult.getCode()) {
throw new BaseException("新增定时任务失败,请重试!");
}
Integer taskId = Integer.parseInt((String) jobResult.getData());
if (taskId == null) {
throw new BaseException("未获取到定时任务ID");
}
reservoirs.setStatus(OutboundConstants.CALL_TASK_STATUS_RUN);
reservoirs.setTaskId(taskId);
updateById(reservoirs);
return ResponseResult.buildResponseResult(ResCodeEnum.SUCCESS);
}