分布式定时任务
- 1.为什么需要定时任务?
- 2.数据库实现分布式定时任务
- 3.基于redis实现
1.为什么需要定时任务?
因为有时候我们需要定时的执行一些操作,比如业务中产生的一些临时文件,临时文件不能立即删除,因为不清楚用户是否操作完毕,不能立即删除,需要隔一段时间然后定时清楚,还有像是一些电商项目,每月进行数据清算。比如某些业务的排行榜,实时性不是高的也可以使用定时任务去统计,然后在做更新。但是我们现在大多数的应用都是分布式的?相当于你写的一个定时任务会在多个子系统中运行,而且是同时的,我们只需要其中一个任务运行就可以了,如果多次运行不仅会无故消耗系统资源,还会导致任务执行出现意外,那么怎么保证这个任务只执行一次呢?其实解决方案有很多。
分布式任务执行出现的问题,如下图所示:
- 使用数据库唯一约束加锁
- 使用redis的setNX命令
- 使用分布式框架Quartz,TBSchedule,elastic-job,Saturn,xxl-job等
当然技术是为业务服务的,我们怎么选择合适的技术,还得是看业务场景,比如一些任务的执行频率不高,也不是特别要求效率,也不复杂,我们完全用不上为了一个定时任务去引入一些第三方的框架作为定时任务实现,我们来介绍两种方式来实现分布式定时任务。
2.数据库实现分布式定时任务
数据库实现定时任务的核心思路:
- 需要两张表,一张定时任务配置表,还有一张定时任务运行记录表
- 任务配置表有一个唯一约束字段,运行记录表由运行日期+任务名称作为唯一约束,这是实现的核心思路
- 使用注解+aop对定时任务进行代理,统一进行加锁操作,避免多次运行
- 这种适合任务不频繁,一天在某个时间点执行,对性能要求不高的业务场景,实现起来也比较简单
表SQL语句:
-- 任务运行记录表
CREATE TABLE `task_record` (
`ID` varchar(20) NOT NULL COMMENT 'ID',
`start_time` datetime DEFAULT NULL COMMENT '定时任务开始时间',
`ent_time` datetime DEFAULT NULL COMMENT '定时任务结束时间',
`is_success` varchar(1) DEFAULT NULL COMMENT '是否执行成功',
`error_cause` longtext COMMENT '失败原因',
`task_name` varchar(100) NOT NULL COMMENT '任务名称',
`run_date` varchar(6) DEFAULT NULL COMMENT '运行日期',
PRIMARY KEY (`ID`),
UNIQUE KEY `run_date_task_name_idx` (`run_date`,`task_name`) USING BTREE COMMENT '运行日期+任务名称作为唯一约束'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='定时任务运行记录表';
-- 任务配置表
CREATE TABLE `task_config` (
`id` varchar(32) NOT NULL COMMENT '序号',
`task_describe` varchar(225) DEFAULT NULL COMMENT '任务描述',
`task_name` varchar(100) DEFAULT NULL COMMENT '任务名称',
`task_valid` varchar(1) DEFAULT NULL COMMENT '任务有效标志',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`),
UNIQUE KEY `task_index` (`task_name`) COMMENT '唯一性约束'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='定时任务配置表';
1.定时任务标识注解:
/**
* 标注在定时任务上,避免多个微服务的情况下定时任务执行重复
* @author compass
* @date 2023-03-09
* @since 1.0
**/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DatabaseLock {
//定时任务使用的键,千万不要重复
String lockName() default "";
//定时任务描述
String lockDesc() default "";
}
2.使用aop代理定时任务方法进行拦截
/**
* 代理具体的定时任务,仅有一个任务会被成功执行
*
* @author compass
* @date 2023-03-09
* @since 1.0
**/
@Aspect
@Slf4j
@Component
public class DatabaseLockAspect {
@Resource
private TaskService taskService;
private static final String TASK_IS_VALID = "1";
@Around("@annotation( com.springboot.example.annotation.DatabaseLock)")
public Object cacheLockPoint(ProceedingJoinPoint pjp) {
Date startTime = new Date();
TaskRecord taskRecord = new TaskRecord();
String isRunSuccess = "1";
String taskConfigId;
String errorCause = "";
Boolean currentDayRunRecord ;
Method cacheMethod = null;
for (Method method : pjp.getTarget().getClass().getMethods()) {
if (null != method.getAnnotation(DatabaseLock.class)) {
cacheMethod = method;
break;
}
}
if (cacheMethod != null) {
String lockName = cacheMethod.getAnnotation(DatabaseLock.class).lockName();
String lockDesc = cacheMethod.getAnnotation(DatabaseLock.class).lockDesc();
// 运行主键,避免多次运行核心关键
String runDate = DateUtil.format(new Date(), "yyyyMMdd");
String taskRecordId = IdUtil.getSnowflakeNextIdStr();
taskRecord.setTaskName(lockName);
taskRecord.setId(taskRecordId);
taskRecord.setRunDate(runDate);
if (StringUtils.isBlank(lockName)) {
throw new RuntimeException("定时任务锁名称不能为空");
}
if (StringUtils.isBlank(lockDesc)) {
throw new RuntimeException("定时任务锁描述不能为空");
}
TaskConfig taskConfig = taskService.hasRun(lockName);
// 还未运行过,进行初始化处理
if (taskConfig == null) {
TaskConfig config = new TaskConfig();
taskConfigId = IdUtil.getSnowflakeNextIdStr();
config.setId(taskConfigId);
config.setTaskDescribe(lockDesc);
config.setTaskName(lockName);
config.setTaskValid("1");
config.setCreateTime(new Date());
try {
// 添加时出现异常,已经运行过该定时任务
taskService.addTaskConfig(config);
taskConfig = config;
} catch (Exception e) {
e.printStackTrace();
}
// 有效标志位0表示无需执行
} else if (!TASK_IS_VALID.equals(taskConfig.getTaskValid())) {
String message = "该定时任务已经禁用";
log.warn("method:{}未获取锁:{}[运行失败原因:{}]", cacheMethod, lockName, message);
throw new RuntimeException(String.format("method:%s未获取锁:%s[运行失败原因:%s]", cacheMethod, lockName, message));
}
// 添加运行记录,以runKey为唯一标识,插入异常,说明执行过
try {
currentDayRunRecord = taskService.addCurrentDayRunRecord(taskRecord);
} catch (Exception e) {
log.warn("method:{}未获取锁:{}[运行失败原因:已经有别的服务进行执行]", cacheMethod, lockName);
return null;
}
// 没有执行过,开始执行
if (currentDayRunRecord) {
try {
log.warn("method:{}获取锁:{},运行成功!", cacheMethod, lockName);
return pjp.proceed();
} catch (Throwable e) {
e.printStackTrace();
isRunSuccess = "0";
errorCause = ExceptionUtils.getExceptionDetail(e);
} finally {
Date endTime = new Date();
taskRecord.setStartTime(startTime);
taskRecord.setId(IdUtil.getSnowflakeNextIdStr());
taskRecord.setEntTime(endTime);
taskRecord.setIsSuccess(isRunSuccess);
taskRecord.setErrorCause(errorCause);
// 修改运行记录
taskService.updateTaskRunRecord(taskRecord);
}
}
}
return null;
}
}
3.TaskService实现操作数据库接口与实现
public interface TaskService {
/**
* 判断定时任务是否运行过
*
* @param taskName
* @return com.springboot.example.bean.task.TaskConfig
* @author compass
* @date 2023/3/10 21:22
* @since 1.0.0
**/
TaskConfig hasRun(String taskName);
/**
* 将首次运行的任务添加到任务配置表
*
* @param taskConfig
* @return java.lang.Boolean
* @author compass
* @date 2023/3/10 21:23
* @since 1.0.0
**/
Boolean addTaskConfig(TaskConfig taskConfig);
/**
* 更新定时任务运行记录
*
* @param taskRecord
* @return java.lang.Boolean
* @author compass
* @date 2023/3/10 21:23
* @since 1.0.0
**/
Boolean updateTaskRunRecord(TaskRecord taskRecord);
/**
* 新增一条运行记录,只有新增成功的服务才可以得到运行劝
* @param taskRecord
* @return java.lang.Boolean
* @author compass
* @date 2023/3/10 21:23
* @since 1.0.0
**/
Boolean addCurrentDayRunRecord(TaskRecord taskRecord);
}
@Slf4j
@Service
public class TaskServiceImpl implements TaskService {
@Resource
private TaskConfigMapper taskConfigMapper;
@Resource
private TaskRecordMapper taskRecordMapper;
@Override
public TaskConfig hasRun(String taskName) {
QueryWrapper<TaskConfig> wrapper = new QueryWrapper<>();
wrapper.eq("task_name",taskName);
return taskConfigMapper.selectOne(wrapper);
}
@Override
public Boolean addTaskConfig(TaskConfig taskConfig) {
return taskConfigMapper.insert(taskConfig)>0;
}
@Override
public Boolean updateTaskRunRecord(TaskRecord taskRecord ) {
QueryWrapper<TaskRecord> wrapper = new QueryWrapper<>();
wrapper.eq("task_name",taskRecord.getTaskName());
wrapper.eq("run_date",taskRecord.getRunDate());
return taskRecordMapper.update(taskRecord,wrapper)>0;
}
@Override
public Boolean addCurrentDayRunRecord(TaskRecord taskRecord) {
return taskRecordMapper.insert(taskRecord)>0;
}
}
4.数据库对应的实体类
// 配置类
@Data
@TableName("task_config")
public class TaskConfig {
/**
* 序号
*/
@TableId(value = "id", type = IdType.ASSIGN_ID)
private String id;
/**
* 任务描述
*/
private String taskDescribe;
/**
* 任务名称
*/
private String taskName;
/**
* 任务有效标志
*/
private String taskValid;
/**
* 创建时间
*/
private Date createTime;
}
// 运行记录类
@Data
@TableName("task_record")
public class TaskRecord {
/**
* ID
*/
@TableId(value = "id", type = IdType.ASSIGN_ID)
private String id;
/**
* 定时任务开始时间
*/
private Date startTime;
/**
* 定时任务结束时间
*/
private Date entTime;
/**
* 是否执行成功
*/
private String isSuccess;
/**
* 失败原因
*/
private String errorCause;
/**
* 运行日期[yyyyMMdd]
*/
private String runDate;
/**
* 任务名称(任务名称+运行日期为唯一索引)
*/
private String taskName;
}
3.基于redis实现
- 主要是基于setNX来实现的,setNX表示这个key存在,则设置value失败,只有这个key不存在的时候,才会set成功
- 我们可以给这个key指定过期时间,让他一定会释放锁,不然容易出现死锁的情况
1.操作redis锁的工具类
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* redis锁工具类,如果redis是集群的话需要考虑数据延时性,这里默认为redis单个节点
*
* @author compass
* @date 2023-03-10
* @since 1.0
**/
@SuppressWarnings(value = {"all"})
@Component
public class RedisLockUtils {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 加锁
*
* @param key
* @param value
* @param time
* @param timeUnit
* @return boolean
* @author compass
* @date 2023/3/10 22:13
* @since 1.0.0
**/
public boolean lock(String key, String value, long time, TimeUnit timeUnit) {
return (Boolean)redisTemplate.execute((RedisCallback) connection -> {
Boolean setNX = connection.setNX(key.getBytes(), value.getBytes());
if (setNX){
return connection.expire(key.getBytes(),time);
}
return false;
});
}
/**
* 立即释放锁,如果任务执行的非常快,可能会导致其他应用获得到锁,二次执行
*
* @param key
* @return boolean
* @author compass
* @date 2023/3/10 22:13
* @since 1.0.0
**/
public boolean fastReleaseLock(String key) {
return redisTemplate.delete(key);
}
/**
* 缓慢释放锁(隔离小段时间再释放锁,可以完全避免掉别的应用获取到锁)
*
* @param key
* @param time
* @param timeUnit
* @return boolean
* @author compass
* @date 2023/3/10 22:13
* @since 1.0.0
**/
public boolean turtleReleaseLock(String key, long time, TimeUnit timeUnit) {
return redisTemplate.expire(key, time, timeUnit);
}
}
2.aop切入,统一管理定时任务
import com.springboot.example.annotation.CacheLock;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
/**
* 代理具体的定时任务,仅有一个任务会被成功执行
*
* @author compass
* @date 2023-03-09
* @since 1.0
**/
@Aspect
@Slf4j
@Component
public class CacheLockAspect {
@Resource
private RedisLockUtils redisLockUtils;
/**
* 加锁值,可以是任意值
**/
private static final String LOCK_VALUE = "1";
@Around("@annotation(com.springboot.example.annotation.CacheLock)")
public Object cacheLockPoint(ProceedingJoinPoint pjp) {
Method cacheMethod = null;
for (Method method : pjp.getTarget().getClass().getMethods()) {
if (null != method.getAnnotation(CacheLock.class)) {
cacheMethod = method;
break;
}
}
if (cacheMethod!=null){
CacheLock cacheLock = cacheMethod.getAnnotation(CacheLock.class);
String lockName = cacheLock.lockName();
long time = cacheLock.timeOut();
boolean successLock = redisLockUtils.lock(lockName,LOCK_VALUE, time, TimeUnit.SECONDS);
if (successLock){
log.info("method:{}获取锁成功:{}", cacheMethod, lockName);
try {
// 获得锁调用被代理的定时任务
return pjp.proceed();
} catch (Throwable throwable) {
throwable.printStackTrace();
}finally {
// 延时5秒再去释放锁
redisLockUtils.turtleReleaseLock(lockName,5,TimeUnit.SECONDS);
}
}else {
log.warn("method:{}获取锁失败:{}", cacheMethod, lockName);
}
}
return null;
}
}
3.自定义注解
/**
* 标注在定时任务上,避免多个微服务的情况下定时任务执行重复
* @author compass
* @date 2023-03-09
* @since 1.0
**/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {
//定时任务使用的键,千万不要重复
String lockName() ;
// 占用锁的时间,单位是秒,默认10分钟
long timeOut() default 60*10;
}
今天就先介绍这两种方式,后续我再为大家续上使用别的框架进行实现,不过在实现的过程中使用 redisTemplate.opsForValue().setIfAbsent() 出现了一点小肯,他返回的是null值,然后出现空指针,然后我不得不采用execute的方式去执行。