我没有失约,我与春风共至,我与夏蝉共鸣,我与秋叶共舞,我与凛冬共至,唯独你背道而行!
系列文章目录
- 项目搭建
- App登录及网关
- App文章
- 自媒体平台(博主后台)
- 内容审核(自动)
- 延迟任务 - 精准发布文章
文章目录
- 系列文章目录
- 一、延迟任务概述
- 1.什么是延迟任务
- 2. 技术选型
- ⑴. DelayQueue
- ⑵. RabbitMQ实现延迟任务
- ⑶. redis实现
- 二、redis实现延迟任务
- 1. redis实现思路
- 2. 导入微服务
- ⑴. 模块包
- ⑵. Pom配置
- ⑶. Nacos配置
- 3. 表结构分析
- 4. 实体类
- 5. 乐观锁
- ⑴. 解决并发策略
- ⑵. 启动类
- 6. redis
- ⑴. 容器
- ⑵. redis连接
- ⑶. 集成
- ①. 依赖
- ②. Nacos配置
- ③. 工具类
- ④. 自动配置
- ⑷. 测试
- ①. list
- ②. Zset
- 7. 服务实现
- ⑴. Mapper
- ①. taskinfoMapper
- ②. xml
- ③. taskinfoLogsMapper
- ⑵. Dto
- ⑶. 常量
- ⑷. Service
- ⑸. ServiceImpl
- ⑹. 测试
- ①. 当下时间
- ②. 未来时间
- ③. 未来时间(超过五分钟)
- 三、延迟对列服务
- 1. 取消任务
- ⑴. 需求分析
- ⑵. Service
- ⑶. ServiceImpl
- ⑷. 测试类
- 2. 消费任务/拉取任务
- ⑴. 需求分析
- ⑵. Service
- ⑶. ServiceImpl
- ⑷. 测试类
- ①. 初始数据
- ②. 测试类
- ③. Result
- 3. 定时刷新
- ⑴. reids key值匹配
- ①. keys 模糊匹配
- ②. scan
- ③. 测试类
- ⑵. ServiceImpl
- ⑶. 启动类
- ⑷. 测试类
- ①. 初始数据
- ②. 定时刷新
- 4. redis分布式锁
- ⑴. 需求分析
- ⑵. 存在的问题
- ①. 服务配置
- ②. 创建服务
- ③. 启动服务
- ⑶. 工具类
- ⑷. 定时刷新加锁
- ⑸. 测试
- 5. 数据库同步到redis
- ⑴. 需求分析
- ⑵. 初始数据
- ⑶. 同步至redis
- ⑷. 测试
- 6. 延迟对列接口定义
- ⑴. 提供对外接口
- ⑵. 微服务实现接口
- 7. 添加延迟任务
- ⑴. 序列化
- ①. 序列化工具对比
- ②. pom依赖
- ③. JdkSerialize
- ④. Protostuff
- ⑤. 测试
- ⑵. 枚举
- ⑶. Service
- ⑷. ServiceImpl
- ⑸. 调用延迟任务
- ⑹. 测试
- 8. 消费任务、审核文章
- ⑴. Service
- ⑵. ServiceImpl
- ⑶. 引导类
- ⑷. 测试
一、延迟任务概述
1.什么是延迟任务
- 定时任务: 有固定周期的,有明确的触发时间
- 延迟队列: 没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟
应用场景:
- 场景一: 订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消
- 场景二: 接口对接出现网络问题,1分钟后重试,如果失败,2分钟重试,直到出现阈值终止
2. 技术选型
⑴. DelayQueue
JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素
使用线程池或者原生DelayQueue程序挂掉之后,任务都是放在内存,需要考虑未处理消息的丢失带来的影响,如何保证数据不丢失,需要持久化(磁盘)
⑵. RabbitMQ实现延迟任务
- TTL: Time To Live (消息存活时间)
- 死信队列: Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以重新发送另一个交换机(死信交换机)
⑶. redis实现
zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳作为score进行排序
例如:
生产者添加到4个任务到延迟队列中,时间毫秒值分别为97、98、99、100。当前时间的毫秒值为90
消费者端进行监听,如果当前时间的毫秒值匹配到了延迟队列中的毫秒值就立即消费
二、redis实现延迟任务
1. redis实现思路
问题思路:
- 为什么任务需要存储在数据库中?
延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。 - 为什么redis中使用两种数据类型,list和zset?
效率问题,算法的时间复杂度 - 在添加zset数据的时候,为什么不需要预加载?
任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。
2. 导入微服务
⑴. 模块包
资源链接: https://pan.baidu.com/s/1zKW2mT2R_XAq95kzuk9B2Q?pwd=abcd
解压至 heima-leadnews-service
目录下
⑵. Pom配置
编辑 heima-leadnews-service/pom.xml
文件:
<!--延迟任务模块-->
<module>heima-leadnews-schedule</module>
⑶. Nacos配置
添加 leadnews-schedule
配置:
spring:
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
username: root
password: 123456
# 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
mybatis-plus:
mapper-locations: classpath*:mapper/*.xml
# 设置别名包扫描路径,通过该属性可以给包中的类注册别名
type-aliases-package: com.heima.model.schedule.pojos
3. 表结构分析
sql链接: https://pan.baidu.com/s/1bAmVJxP8NSKOBZAwoKJGHg?pwd=abcd
taskinfo 任务表
taskinfo_logs 任务日志表
MySQL中,BLOB是一个二进制大型对象,是一个可以存储大量数据的容器;LongBlob 最大存储 4G
4. 实体类
新建 heima-leadnews-model/src/main/java/com/heima/model/schedule/pojos/Taskinfo.java
文件:
/**
* <p>
*
* </p>
*
* @author itheima
*/
@Data
@TableName("taskinfo")
public class Taskinfo implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 任务id
*/
@TableId(type = IdType.ID_WORKER)
private Long taskId;
/**
* 执行时间
*/
@TableField("execute_time")
private Date executeTime;
/**
* 参数
*/
@TableField("parameters")
private byte[] parameters;
/**
* 优先级
*/
@TableField("priority")
private Integer priority;
/**
* 任务类型
*/
@TableField("task_type")
private Integer taskType;
}
新建 heima-leadnews-model/src/main/java/com/heima/model/schedule/pojos/TaskinfoLogs.java
文件:
/**
* <p>
*
* </p>
*
* @author itheima
*/
@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 任务id
*/
@TableId(type = IdType.ID_WORKER)
private Long taskId;
/**
* 执行时间
*/
@TableField("execute_time")
private Date executeTime;
/**
* 参数
*/
@TableField("parameters")
private byte[] parameters;
/**
* 优先级
*/
@TableField("priority")
private Integer priority;
/**
* 任务类型
*/
@TableField("task_type")
private Integer taskType;
/**
* 版本号,用乐观锁
*/
@Version
private Integer version;
/**
* 状态 0=int 1=EXECUTED 2=CANCELLED
*/
@TableField("status")
private Integer status;
}
5. 乐观锁
⑴. 解决并发策略
悲观锁(Pessimistic Lock)
每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁
乐观锁(Optimistic Lock)
每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制
⑵. 启动类
编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/ScheduleApplication.java
文件:
/**
* mybatis-plus乐观锁支持
* @return
*/
@Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
return interceptor;
}
6. redis
⑴. 容器
# 列出本地镜像
docker images
# 创建容器
docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"
# 列出容器
docker ps
⑵. redis连接
redis安装链接:https://pan.baidu.com/s/1IzSRs6JPAxM64IomZv8U6g?pwd=abcd
⑶. 集成
①. 依赖
编辑 heima-leadnews-common/pom.xml
文件:
<!--spring data redis & cache -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
②. Nacos配置
编辑 leadnews-schedule
配置:
spring:
redis:
host: 192.168.200.130
password: leadnews
port: 6379
③. 工具类
新建 heima-leadnews-common/src/main/java/com/heima/common/redis/CacheService.java
文件:
@Component
public class CacheService extends CachingConfigurerSupport {
@Autowired
private StringRedisTemplate stringRedisTemplate;
public StringRedisTemplate getstringRedisTemplate() {
return this.stringRedisTemplate;
}
/** -------------------key相关操作--------------------- */
/**
* 删除key
*
* @param key
*/
public void delete(String key) {
stringRedisTemplate.delete(key);
}
/**
* 批量删除key
*
* @param keys
*/
public void delete(Collection<String> keys) {
stringRedisTemplate.delete(keys);
}
/**
* 序列化key
*
* @param key
* @return
*/
public byte[] dump(String key) {
return stringRedisTemplate.dump(key);
}
/**
* 是否存在key
*
* @param key
* @return
*/
public Boolean exists(String key) {
return stringRedisTemplate.hasKey(key);
}
/**
* 设置过期时间
*
* @param key
* @param timeout
* @param unit
* @return
*/
public Boolean expire(String key, long timeout, TimeUnit unit) {
return stringRedisTemplate.expire(key, timeout, unit);
}
/**
* 设置过期时间
*
* @param key
* @param date
* @return
*/
public Boolean expireAt(String key, Date date) {
return stringRedisTemplate.expireAt(key, date);
}
/**
* 查找匹配的key
*
* @param pattern
* @return
*/
public Set<String> keys(String pattern) {
return stringRedisTemplate.keys(pattern);
}
/**
* 将当前数据库的 key 移动到给定的数据库 db 当中
*
* @param key
* @param dbIndex
* @return
*/
public Boolean move(String key, int dbIndex) {
return stringRedisTemplate.move(key, dbIndex);
}
/**
* 移除 key 的过期时间,key 将持久保持
*
* @param key
* @return
*/
public Boolean persist(String key) {
return stringRedisTemplate.persist(key);
}
/**
* 返回 key 的剩余的过期时间
*
* @param key
* @param unit
* @return
*/
public Long getExpire(String key, TimeUnit unit) {
return stringRedisTemplate.getExpire(key, unit);
}
/**
* 返回 key 的剩余的过期时间
*
* @param key
* @return
*/
public Long getExpire(String key) {
return stringRedisTemplate.getExpire(key);
}
/**
* 从当前数据库中随机返回一个 key
*
* @return
*/
public String randomKey() {
return stringRedisTemplate.randomKey();
}
/**
* 修改 key 的名称
*
* @param oldKey
* @param newKey
*/
public void rename(String oldKey, String newKey) {
stringRedisTemplate.rename(oldKey, newKey);
}
/**
* 仅当 newkey 不存在时,将 oldKey 改名为 newkey
*
* @param oldKey
* @param newKey
* @return
*/
public Boolean renameIfAbsent(String oldKey, String newKey) {
return stringRedisTemplate.renameIfAbsent(oldKey, newKey);
}
/**
* 返回 key 所储存的值的类型
*
* @param key
* @return
*/
public DataType type(String key) {
return stringRedisTemplate.type(key);
}
/** -------------------string相关操作--------------------- */
/**
* 设置指定 key 的值
* @param key
* @param value
*/
public void set(String key, String value) {
stringRedisTemplate.opsForValue().set(key, value);
}
/**
* 获取指定 key 的值
* @param key
* @return
*/
public String get(String key) {
return stringRedisTemplate.opsForValue().get(key);
}
/**
* 返回 key 中字符串值的子字符
* @param key
* @param start
* @param end
* @return
*/
public String getRange(String key, long start, long end) {
return stringRedisTemplate.opsForValue().get(key, start, end);
}
/**
* 将给定 key 的值设为 value ,并返回 key 的旧值(old value)
*
* @param key
* @param value
* @return
*/
public String getAndSet(String key, String value) {
return stringRedisTemplate.opsForValue().getAndSet(key, value);
}
/**
* 对 key 所储存的字符串值,获取指定偏移量上的位(bit)
*
* @param key
* @param offset
* @return
*/
public Boolean getBit(String key, long offset) {
return stringRedisTemplate.opsForValue().getBit(key, offset);
}
/**
* 批量获取
*
* @param keys
* @return
*/
public List<String> multiGet(Collection<String> keys) {
return stringRedisTemplate.opsForValue().multiGet(keys);
}
/**
* 设置ASCII码, 字符串'a'的ASCII码是97, 转为二进制是'01100001', 此方法是将二进制第offset位值变为value
*
* @param key
* @param
* @param value
* 值,true为1, false为0
* @return
*/
public boolean setBit(String key, long offset, boolean value) {
return stringRedisTemplate.opsForValue().setBit(key, offset, value);
}
/**
* 将值 value 关联到 key ,并将 key 的过期时间设为 timeout
*
* @param key
* @param value
* @param timeout
* 过期时间
* @param unit
* 时间单位, 天:TimeUnit.DAYS 小时:TimeUnit.HOURS 分钟:TimeUnit.MINUTES
* 秒:TimeUnit.SECONDS 毫秒:TimeUnit.MILLISECONDS
*/
public void setEx(String key, String value, long timeout, TimeUnit unit) {
stringRedisTemplate.opsForValue().set(key, value, timeout, unit);
}
/**
* 只有在 key 不存在时设置 key 的值
*
* @param key
* @param value
* @return 之前已经存在返回false,不存在返回true
*/
public boolean setIfAbsent(String key, String value) {
return stringRedisTemplate.opsForValue().setIfAbsent(key, value);
}
/**
* 用 value 参数覆写给定 key 所储存的字符串值,从偏移量 offset 开始
*
* @param key
* @param value
* @param offset
* 从指定位置开始覆写
*/
public void setRange(String key, String value, long offset) {
stringRedisTemplate.opsForValue().set(key, value, offset);
}
/**
* 获取字符串的长度
*
* @param key
* @return
*/
public Long size(String key) {
return stringRedisTemplate.opsForValue().size(key);
}
/**
* 批量添加
*
* @param maps
*/
public void multiSet(Map<String, String> maps) {
stringRedisTemplate.opsForValue().multiSet(maps);
}
/**
* 同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在
*
* @param maps
* @return 之前已经存在返回false,不存在返回true
*/
public boolean multiSetIfAbsent(Map<String, String> maps) {
return stringRedisTemplate.opsForValue().multiSetIfAbsent(maps);
}
/**
* 增加(自增长), 负数则为自减
*
* @param key
* @param
* @return
*/
public Long incrBy(String key, long increment) {
return stringRedisTemplate.opsForValue().increment(key, increment);
}
/**
*
* @param key
* @param
* @return
*/
public Double incrByFloat(String key, double increment) {
return stringRedisTemplate.opsForValue().increment(key, increment);
}
/**
* 追加到末尾
*
* @param key
* @param value
* @return
*/
public Integer append(String key, String value) {
return stringRedisTemplate.opsForValue().append(key, value);
}
/** -------------------hash相关操作------------------------- */
/**
* 获取存储在哈希表中指定字段的值
*
* @param key
* @param field
* @return
*/
public Object hGet(String key, String field) {
return stringRedisTemplate.opsForHash().get(key, field);
}
/**
* 获取所有给定字段的值
*
* @param key
* @return
*/
public Map<Object, Object> hGetAll(String key) {
return stringRedisTemplate.opsForHash().entries(key);
}
/**
* 获取所有给定字段的值
*
* @param key
* @param fields
* @return
*/
public List<Object> hMultiGet(String key, Collection<Object> fields) {
return stringRedisTemplate.opsForHash().multiGet(key, fields);
}
public void hPut(String key, String hashKey, String value) {
stringRedisTemplate.opsForHash().put(key, hashKey, value);
}
public void hPutAll(String key, Map<String, String> maps) {
stringRedisTemplate.opsForHash().putAll(key, maps);
}
/**
* 仅当hashKey不存在时才设置
*
* @param key
* @param hashKey
* @param value
* @return
*/
public Boolean hPutIfAbsent(String key, String hashKey, String value) {
return stringRedisTemplate.opsForHash().putIfAbsent(key, hashKey, value);
}
/**
* 删除一个或多个哈希表字段
*
* @param key
* @param fields
* @return
*/
public Long hDelete(String key, Object... fields) {
return stringRedisTemplate.opsForHash().delete(key, fields);
}
/**
* 查看哈希表 key 中,指定的字段是否存在
*
* @param key
* @param field
* @return
*/
public boolean hExists(String key, String field) {
return stringRedisTemplate.opsForHash().hasKey(key, field);
}
/**
* 为哈希表 key 中的指定字段的整数值加上增量 increment
*
* @param key
* @param field
* @param increment
* @return
*/
public Long hIncrBy(String key, Object field, long increment) {
return stringRedisTemplate.opsForHash().increment(key, field, increment);
}
/**
* 为哈希表 key 中的指定字段的整数值加上增量 increment
*
* @param key
* @param field
* @param delta
* @return
*/
public Double hIncrByFloat(String key, Object field, double delta) {
return stringRedisTemplate.opsForHash().increment(key, field, delta);
}
/**
* 获取所有哈希表中的字段
*
* @param key
* @return
*/
public Set<Object> hKeys(String key) {
return stringRedisTemplate.opsForHash().keys(key);
}
/**
* 获取哈希表中字段的数量
*
* @param key
* @return
*/
public Long hSize(String key) {
return stringRedisTemplate.opsForHash().size(key);
}
/**
* 获取哈希表中所有值
*
* @param key
* @return
*/
public List<Object> hValues(String key) {
return stringRedisTemplate.opsForHash().values(key);
}
/**
* 迭代哈希表中的键值对
*
* @param key
* @param options
* @return
*/
public Cursor<Map.Entry<Object, Object>> hScan(String key, ScanOptions options) {
return stringRedisTemplate.opsForHash().scan(key, options);
}
/** ------------------------list相关操作---------------------------- */
/**
* 通过索引获取列表中的元素
*
* @param key
* @param index
* @return
*/
public String lIndex(String key, long index) {
return stringRedisTemplate.opsForList().index(key, index);
}
/**
* 获取列表指定范围内的元素
*
* @param key
* @param start
* 开始位置, 0是开始位置
* @param end
* 结束位置, -1返回所有
* @return
*/
public List<String> lRange(String key, long start, long end) {
return stringRedisTemplate.opsForList().range(key, start, end);
}
/**
* 存储在list头部
*
* @param key
* @param value
* @return
*/
public Long lLeftPush(String key, String value) {
return stringRedisTemplate.opsForList().leftPush(key, value);
}
/**
*
* @param key
* @param value
* @return
*/
public Long lLeftPushAll(String key, String... value) {
return stringRedisTemplate.opsForList().leftPushAll(key, value);
}
/**
*
* @param key
* @param value
* @return
*/
public Long lLeftPushAll(String key, Collection<String> value) {
return stringRedisTemplate.opsForList().leftPushAll(key, value);
}
/**
* 当list存在的时候才加入
*
* @param key
* @param value
* @return
*/
public Long lLeftPushIfPresent(String key, String value) {
return stringRedisTemplate.opsForList().leftPushIfPresent(key, value);
}
/**
* 如果pivot存在,再pivot前面添加
*
* @param key
* @param pivot
* @param value
* @return
*/
public Long lLeftPush(String key, String pivot, String value) {
return stringRedisTemplate.opsForList().leftPush(key, pivot, value);
}
/**
*
* @param key
* @param value
* @return
*/
public Long lRightPush(String key, String value) {
return stringRedisTemplate.opsForList().rightPush(key, value);
}
/**
*
* @param key
* @param value
* @return
*/
public Long lRightPushAll(String key, String... value) {
return stringRedisTemplate.opsForList().rightPushAll(key, value);
}
/**
*
* @param key
* @param value
* @return
*/
public Long lRightPushAll(String key, Collection<String> value) {
return stringRedisTemplate.opsForList().rightPushAll(key, value);
}
/**
* 为已存在的列表添加值
*
* @param key
* @param value
* @return
*/
public Long lRightPushIfPresent(String key, String value) {
return stringRedisTemplate.opsForList().rightPushIfPresent(key, value);
}
/**
* 在pivot元素的右边添加值
*
* @param key
* @param pivot
* @param value
* @return
*/
public Long lRightPush(String key, String pivot, String value) {
return stringRedisTemplate.opsForList().rightPush(key, pivot, value);
}
/**
* 通过索引设置列表元素的值
*
* @param key
* @param index
* 位置
* @param value
*/
public void lSet(String key, long index, String value) {
stringRedisTemplate.opsForList().set(key, index, value);
}
/**
* 移出并获取列表的第一个元素
*
* @param key
* @return 删除的元素
*/
public String lLeftPop(String key) {
return stringRedisTemplate.opsForList().leftPop(key);
}
/**
* 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
*
* @param key
* @param timeout
* 等待时间
* @param unit
* 时间单位
* @return
*/
public String lBLeftPop(String key, long timeout, TimeUnit unit) {
return stringRedisTemplate.opsForList().leftPop(key, timeout, unit);
}
/**
* 移除并获取列表最后一个元素
*
* @param key
* @return 删除的元素
*/
public String lRightPop(String key) {
return stringRedisTemplate.opsForList().rightPop(key);
}
/**
* 移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
*
* @param key
* @param timeout
* 等待时间
* @param unit
* 时间单位
* @return
*/
public String lBRightPop(String key, long timeout, TimeUnit unit) {
return stringRedisTemplate.opsForList().rightPop(key, timeout, unit);
}
/**
* 移除列表的最后一个元素,并将该元素添加到另一个列表并返回
*
* @param sourceKey
* @param destinationKey
* @return
*/
public String lRightPopAndLeftPush(String sourceKey, String destinationKey) {
return stringRedisTemplate.opsForList().rightPopAndLeftPush(sourceKey,
destinationKey);
}
/**
* 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
*
* @param sourceKey
* @param destinationKey
* @param timeout
* @param unit
* @return
*/
public String lBRightPopAndLeftPush(String sourceKey, String destinationKey,
long timeout, TimeUnit unit) {
return stringRedisTemplate.opsForList().rightPopAndLeftPush(sourceKey,
destinationKey, timeout, unit);
}
/**
* 删除集合中值等于value得元素
*
* @param key
* @param index
* index=0, 删除所有值等于value的元素; index>0, 从头部开始删除第一个值等于value的元素;
* index<0, 从尾部开始删除第一个值等于value的元素;
* @param value
* @return
*/
public Long lRemove(String key, long index, String value) {
return stringRedisTemplate.opsForList().remove(key, index, value);
}
/**
* 裁剪list
*
* @param key
* @param start
* @param end
*/
public void lTrim(String key, long start, long end) {
stringRedisTemplate.opsForList().trim(key, start, end);
}
/**
* 获取列表长度
*
* @param key
* @return
*/
public Long lLen(String key) {
return stringRedisTemplate.opsForList().size(key);
}
/** --------------------set相关操作-------------------------- */
/**
* set添加元素
*
* @param key
* @param values
* @return
*/
public Long sAdd(String key, String... values) {
return stringRedisTemplate.opsForSet().add(key, values);
}
/**
* set移除元素
*
* @param key
* @param values
* @return
*/
public Long sRemove(String key, Object... values) {
return stringRedisTemplate.opsForSet().remove(key, values);
}
/**
* 移除并返回集合的一个随机元素
*
* @param key
* @return
*/
public String sPop(String key) {
return stringRedisTemplate.opsForSet().pop(key);
}
/**
* 将元素value从一个集合移到另一个集合
*
* @param key
* @param value
* @param destKey
* @return
*/
public Boolean sMove(String key, String value, String destKey) {
return stringRedisTemplate.opsForSet().move(key, value, destKey);
}
/**
* 获取集合的大小
*
* @param key
* @return
*/
public Long sSize(String key) {
return stringRedisTemplate.opsForSet().size(key);
}
/**
* 判断集合是否包含value
*
* @param key
* @param value
* @return
*/
public Boolean sIsMember(String key, Object value) {
return stringRedisTemplate.opsForSet().isMember(key, value);
}
/**
* 获取两个集合的交集
*
* @param key
* @param otherKey
* @return
*/
public Set<String> sIntersect(String key, String otherKey) {
return stringRedisTemplate.opsForSet().intersect(key, otherKey);
}
/**
* 获取key集合与多个集合的交集
*
* @param key
* @param otherKeys
* @return
*/
public Set<String> sIntersect(String key, Collection<String> otherKeys) {
return stringRedisTemplate.opsForSet().intersect(key, otherKeys);
}
/**
* key集合与otherKey集合的交集存储到destKey集合中
*
* @param key
* @param otherKey
* @param destKey
* @return
*/
public Long sIntersectAndStore(String key, String otherKey, String destKey) {
return stringRedisTemplate.opsForSet().intersectAndStore(key, otherKey,
destKey);
}
/**
* key集合与多个集合的交集存储到destKey集合中
*
* @param key
* @param otherKeys
* @param destKey
* @return
*/
public Long sIntersectAndStore(String key, Collection<String> otherKeys,
String destKey) {
return stringRedisTemplate.opsForSet().intersectAndStore(key, otherKeys,
destKey);
}
/**
* 获取两个集合的并集
*
* @param key
* @param otherKeys
* @return
*/
public Set<String> sUnion(String key, String otherKeys) {
return stringRedisTemplate.opsForSet().union(key, otherKeys);
}
/**
* 获取key集合与多个集合的并集
*
* @param key
* @param otherKeys
* @return
*/
public Set<String> sUnion(String key, Collection<String> otherKeys) {
return stringRedisTemplate.opsForSet().union(key, otherKeys);
}
/**
* key集合与otherKey集合的并集存储到destKey中
*
* @param key
* @param otherKey
* @param destKey
* @return
*/
public Long sUnionAndStore(String key, String otherKey, String destKey) {
return stringRedisTemplate.opsForSet().unionAndStore(key, otherKey, destKey);
}
/**
* key集合与多个集合的并集存储到destKey中
*
* @param key
* @param otherKeys
* @param destKey
* @return
*/
public Long sUnionAndStore(String key, Collection<String> otherKeys,
String destKey) {
return stringRedisTemplate.opsForSet().unionAndStore(key, otherKeys, destKey);
}
/**
* 获取两个集合的差集
*
* @param key
* @param otherKey
* @return
*/
public Set<String> sDifference(String key, String otherKey) {
return stringRedisTemplate.opsForSet().difference(key, otherKey);
}
/**
* 获取key集合与多个集合的差集
*
* @param key
* @param otherKeys
* @return
*/
public Set<String> sDifference(String key, Collection<String> otherKeys) {
return stringRedisTemplate.opsForSet().difference(key, otherKeys);
}
/**
* key集合与otherKey集合的差集存储到destKey中
*
* @param key
* @param otherKey
* @param destKey
* @return
*/
public Long sDifference(String key, String otherKey, String destKey) {
return stringRedisTemplate.opsForSet().differenceAndStore(key, otherKey,
destKey);
}
/**
* key集合与多个集合的差集存储到destKey中
*
* @param key
* @param otherKeys
* @param destKey
* @return
*/
public Long sDifference(String key, Collection<String> otherKeys,
String destKey) {
return stringRedisTemplate.opsForSet().differenceAndStore(key, otherKeys,
destKey);
}
/**
* 获取集合所有元素
*
* @param key
* @param
* @param
* @return
*/
public Set<String> setMembers(String key) {
return stringRedisTemplate.opsForSet().members(key);
}
/**
* 随机获取集合中的一个元素
*
* @param key
* @return
*/
public String sRandomMember(String key) {
return stringRedisTemplate.opsForSet().randomMember(key);
}
/**
* 随机获取集合中count个元素
*
* @param key
* @param count
* @return
*/
public List<String> sRandomMembers(String key, long count) {
return stringRedisTemplate.opsForSet().randomMembers(key, count);
}
/**
* 随机获取集合中count个元素并且去除重复的
*
* @param key
* @param count
* @return
*/
public Set<String> sDistinctRandomMembers(String key, long count) {
return stringRedisTemplate.opsForSet().distinctRandomMembers(key, count);
}
/**
*
* @param key
* @param options
* @return
*/
public Cursor<String> sScan(String key, ScanOptions options) {
return stringRedisTemplate.opsForSet().scan(key, options);
}
/**------------------zSet相关操作--------------------------------*/
/**
* 添加元素,有序集合是按照元素的score值由小到大排列
*
* @param key
* @param value
* @param score
* @return
*/
public Boolean zAdd(String key, String value, double score) {
return stringRedisTemplate.opsForZSet().add(key, value, score);
}
/**
*
* @param key
* @param values
* @return
*/
public Long zAdd(String key, Set<TypedTuple<String>> values) {
return stringRedisTemplate.opsForZSet().add(key, values);
}
/**
*
* @param key
* @param values
* @return
*/
public Long zRemove(String key, Object... values) {
return stringRedisTemplate.opsForZSet().remove(key, values);
}
public Long zRemove(String key, Collection<String> values) {
if(values!=null&&!values.isEmpty()){
Object[] objs = values.toArray(new Object[values.size()]);
return stringRedisTemplate.opsForZSet().remove(key, objs);
}
return 0L;
}
/**
* 增加元素的score值,并返回增加后的值
*
* @param key
* @param value
* @param delta
* @return
*/
public Double zIncrementScore(String key, String value, double delta) {
return stringRedisTemplate.opsForZSet().incrementScore(key, value, delta);
}
/**
* 返回元素在集合的排名,有序集合是按照元素的score值由小到大排列
*
* @param key
* @param value
* @return 0表示第一位
*/
public Long zRank(String key, Object value) {
return stringRedisTemplate.opsForZSet().rank(key, value);
}
/**
* 返回元素在集合的排名,按元素的score值由大到小排列
*
* @param key
* @param value
* @return
*/
public Long zReverseRank(String key, Object value) {
return stringRedisTemplate.opsForZSet().reverseRank(key, value);
}
/**
* 获取集合的元素, 从小到大排序
*
* @param key
* @param start
* 开始位置
* @param end
* 结束位置, -1查询所有
* @return
*/
public Set<String> zRange(String key, long start, long end) {
return stringRedisTemplate.opsForZSet().range(key, start, end);
}
/**
* 获取zset集合的所有元素, 从小到大排序
*
*/
public Set<String> zRangeAll(String key) {
return zRange(key,0,-1);
}
/**
* 获取集合元素, 并且把score值也获取
*
* @param key
* @param start
* @param end
* @return
*/
public Set<TypedTuple<String>> zRangeWithScores(String key, long start,
long end) {
return stringRedisTemplate.opsForZSet().rangeWithScores(key, start, end);
}
/**
* 根据Score值查询集合元素
*
* @param key
* @param min
* 最小值
* @param max
* 最大值
* @return
*/
public Set<String> zRangeByScore(String key, double min, double max) {
return stringRedisTemplate.opsForZSet().rangeByScore(key, min, max);
}
/**
* 根据Score值查询集合元素, 从小到大排序
*
* @param key
* @param min
* 最小值
* @param max
* 最大值
* @return
*/
public Set<TypedTuple<String>> zRangeByScoreWithScores(String key,
double min, double max) {
return stringRedisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max);
}
/**
*
* @param key
* @param min
* @param max
* @param start
* @param end
* @return
*/
public Set<TypedTuple<String>> zRangeByScoreWithScores(String key,
double min, double max, long start, long end) {
return stringRedisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max,
start, end);
}
/**
* 获取集合的元素, 从大到小排序
*
* @param key
* @param start
* @param end
* @return
*/
public Set<String> zReverseRange(String key, long start, long end) {
return stringRedisTemplate.opsForZSet().reverseRange(key, start, end);
}
public Set<String> zReverseRangeByScore(String key, long min, long max) {
return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max);
}
/**
* 获取集合的元素, 从大到小排序, 并返回score值
*
* @param key
* @param start
* @param end
* @return
*/
public Set<TypedTuple<String>> zReverseRangeWithScores(String key,
long start, long end) {
return stringRedisTemplate.opsForZSet().reverseRangeWithScores(key, start,
end);
}
/**
* 根据Score值查询集合元素, 从大到小排序
*
* @param key
* @param min
* @param max
* @return
*/
public Set<String> zReverseRangeByScore(String key, double min,
double max) {
return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max);
}
/**
* 根据Score值查询集合元素, 从大到小排序
*
* @param key
* @param min
* @param max
* @return
*/
public Set<TypedTuple<String>> zReverseRangeByScoreWithScores(
String key, double min, double max) {
return stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key,
min, max);
}
/**
*
* @param key
* @param min
* @param max
* @param start
* @param end
* @return
*/
public Set<String> zReverseRangeByScore(String key, double min,
double max, long start, long end) {
return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max,
start, end);
}
/**
* 根据score值获取集合元素数量
*
* @param key
* @param min
* @param max
* @return
*/
public Long zCount(String key, double min, double max) {
return stringRedisTemplate.opsForZSet().count(key, min, max);
}
/**
* 获取集合大小
*
* @param key
* @return
*/
public Long zSize(String key) {
return stringRedisTemplate.opsForZSet().size(key);
}
/**
* 获取集合大小
*
* @param key
* @return
*/
public Long zZCard(String key) {
return stringRedisTemplate.opsForZSet().zCard(key);
}
/**
* 获取集合中value元素的score值
*
* @param key
* @param value
* @return
*/
public Double zScore(String key, Object value) {
return stringRedisTemplate.opsForZSet().score(key, value);
}
/**
* 移除指定索引位置的成员
*
* @param key
* @param start
* @param end
* @return
*/
public Long zRemoveRange(String key, long start, long end) {
return stringRedisTemplate.opsForZSet().removeRange(key, start, end);
}
/**
* 根据指定的score值的范围来移除成员
*
* @param key
* @param min
* @param max
* @return
*/
public Long zRemoveRangeByScore(String key, double min, double max) {
return stringRedisTemplate.opsForZSet().removeRangeByScore(key, min, max);
}
/**
* 获取key和otherKey的并集并存储在destKey中
*
* @param key
* @param otherKey
* @param destKey
* @return
*/
public Long zUnionAndStore(String key, String otherKey, String destKey) {
return stringRedisTemplate.opsForZSet().unionAndStore(key, otherKey, destKey);
}
/**
*
* @param key
* @param otherKeys
* @param destKey
* @return
*/
public Long zUnionAndStore(String key, Collection<String> otherKeys,
String destKey) {
return stringRedisTemplate.opsForZSet()
.unionAndStore(key, otherKeys, destKey);
}
/**
* 交集
*
* @param key
* @param otherKey
* @param destKey
* @return
*/
public Long zIntersectAndStore(String key, String otherKey,
String destKey) {
return stringRedisTemplate.opsForZSet().intersectAndStore(key, otherKey,
destKey);
}
/**
* 交集
*
* @param key
* @param otherKeys
* @param destKey
* @return
*/
public Long zIntersectAndStore(String key, Collection<String> otherKeys,
String destKey) {
return stringRedisTemplate.opsForZSet().intersectAndStore(key, otherKeys,
destKey);
}
/**
*
* @param key
* @param options
* @return
*/
public Cursor<TypedTuple<String>> zScan(String key, ScanOptions options) {
return stringRedisTemplate.opsForZSet().scan(key, options);
}
/**
* 扫描主键,建议使用
* @param patten
* @return
*/
public Set<String> scan(String patten){
Set<String> keys = stringRedisTemplate.execute((RedisCallback<Set<String>>) connection -> {
Set<String> result = new HashSet<>();
try (Cursor<byte[]> cursor = connection.scan(new ScanOptions.ScanOptionsBuilder()
.match(patten).count(10000).build())) {
while (cursor.hasNext()) {
result.add(new String(cursor.next()));
}
} catch (IOException e) {
e.printStackTrace();
}
return result;
});
return keys;
}
/**
* 管道技术,提高性能
* @param type
* @param values
* @return
*/
public List<Object> lRightPushPipeline(String type,Collection<String> values){
List<Object> results = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
//集合转换数组
String[] strings = values.toArray(new String[values.size()]);
//直接批量发送
stringRedisConn.rPush(type, strings);
return null;
}
});
return results;
}
public List<Object> refreshWithPipeline(String future_key,String topic_key,Collection<String> values){
List<Object> objects = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
@Nullable
@Override
public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
StringRedisConnection stringRedisConnection = (StringRedisConnection)redisConnection;
String[] strings = values.toArray(new String[values.size()]);
stringRedisConnection.rPush(topic_key,strings);
stringRedisConnection.zRem(future_key,strings);
return null;
}
});
return objects;
}
}
④. 自动配置
新建 heima-leadnews-common/src/main/resources/META-INF/spring.factories
文件:
com.heima.common.redis.CacheService
⑷. 测试
①. list
新建 heima-leadnews-service/heima-leadnews-schedule/src/test/java/com/heima/schedule/RedisTest.java
文件:
@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class RedisTest {
@Autowired
private CacheService cacheService;
@Test
public void testList() {
// list左侧添加元素
// cacheService.lLeftPush("list_001", "hello, redis");
// 在右边获取元素并删除
String list_001 = cacheService.lRightPop("list_001");
System.out.println(list_001);
}
}
②. Zset
编辑 heima-leadnews-service/heima-leadnews-schedule/src/test/java/com/heima/schedule/RedisTest.java
文件:
@Test
public void testZset() {
// 添加数据到zset中 分值
/*cacheService.zAdd("zset_key_001", "hello zset 001", 8888);
cacheService.zAdd("zset_key_001", "hello zset 002", 6666);
cacheService.zAdd("zset_key_001", "hello zset 003", 2222);
cacheService.zAdd("zset_key_001", "hello zset 004", 99999);*/
// 按照分值获取数据
Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888);
System.out.println(zset_key_001);
}
7. 服务实现
⑴. Mapper
①. taskinfoMapper
新建 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/mapper/TaskinfoMapper.java
文件:
/**
* <p>
* Mapper 接口
* </p>
*
* @author itheima
*/
@Mapper
public interface TaskinfoMapper extends BaseMapper<Taskinfo> {
public List<Taskinfo> queryFutureTime(@Param("taskType")int type, @Param("priority")int priority, @Param("future")Date future);
}
②. xml
新建 heima-leadnews-service/heima-leadnews-schedule/src/main/resources/mapper/TaskinfoMapper.xml
文件:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.heima.schedule.mapper.TaskinfoMapper">
<select id="queryFutureTime" resultType="com.heima.model.schedule.pojos.Taskinfo">
select *
from taskinfo
where task_type = #{taskType}
and priority = #{priority}
and execute_time <![CDATA[<]]> #{future,javaType=java.util.Date}
</select>
</mapper>
③. taskinfoLogsMapper
新建 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/mapper/TaskinfoLogsMapper.java
文件:
/**
* <p>
* Mapper 接口
* </p>
*
* @author itheima
*/
@Mapper
public interface TaskinfoLogsMapper extends BaseMapper<TaskinfoLogs> {
}
⑵. Dto
新建 heima-leadnews-model/src/main/java/com/heima/model/schedule/dtos/Task.java
文件:
@Data
public class Task implements Serializable {
/**
* 任务id
*/
private Long taskId;
/**
* 类型
*/
private Integer taskType;
/**
* 优先级
*/
private Integer priority;
/**
* 执行id
*/
private long executeTime;
/**
* task参数
*/
private byte[] parameters;
}
⑶. 常量
新建 heima-leadnews-common/src/main/java/com/heima/common/constants/ScheduleConstants.java
文件:
public class ScheduleConstants {
//task状态
public static final int SCHEDULED=0; //初始化状态
public static final int EXECUTED=1; //已执行状态
public static final int CANCELLED=2; //已取消状态
public static String FUTURE="future_"; //未来数据key前缀
public static String TOPIC="topic_"; //当前数据key前缀
}
⑷. Service
新建 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/service/TaskService.java
文件:
public interface TaskService {
/**
* 添加延时任务
* @param task
* @return
*/
public Long addTask(Task task);
}
⑸. ServiceImpl
新建 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/service/impl/TaskServiceImpl.java
文件:
@Service
@Slf4j
@Transactional
public class TaskServiceImpl implements TaskService {
/**
* 添加延迟任务
* @param task
* @return
*/
@Override
public long addTask(Task task) {
//1.添加任务到数据库中
boolean success = addTaskToDb(task);
if(success) {
//2.添加任务到redis
addTaskToCache(task);
}
return task.getTaskId();
}
@Autowired
private CacheService cacheService;
/**
* 添加任务到redis
* @param task
*/
private void addTaskToCache(Task task) {
// 设置redis的 key
String key = task.getTaskType() +"_" +task.getPriority();
// 获取五分钟之后的时间 毫秒值
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, 5);
long nextScheduleTime = calendar.getTimeInMillis();
// 2.1 如果任务的执行时间小于等于当前时间,存入list
if(task.getExecuteTime() <= System.currentTimeMillis()) {
cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
} else if(task.getExecuteTime() <= nextScheduleTime){
// 2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中
cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
}
}
@Autowired
private TaskinfoMapper taskinfoMapper;
@Autowired
private TaskinfoLogsMapper taskinfoLogsMapper;
/**
* 添加任务到数据库中
* @param task
* @return
*/
private boolean addTaskToDb(Task task) {
boolean flag = false;
try {
// 1. 保存任务表
Taskinfo taskinfo = new Taskinfo();
BeanUtils.copyProperties(task, taskinfo);
taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
taskinfoMapper.insert(taskinfo);
// 设置taskId
task.setTaskId(taskinfo.getTaskId());
// 2. 保存任务日志数据
TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
BeanUtils.copyProperties(taskinfo, taskinfoLogs);
taskinfoLogs.setVersion(1);
taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
taskinfoLogsMapper.insert(taskinfoLogs);
flag = true;
} catch (Exception e) {
flag = false;
e.printStackTrace();
}
return flag;
}
}
⑹. 测试
①. 当下时间
新建 heima-leadnews-service/heima-leadnews-schedule/src/test/java/com/heima/schedule/service/impl/TaskServiceImplTest.java
文件:
@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class TaskServiceImplTest {
@Autowired
private TaskService taskService;
@Test
public void addTask() {
Task task = new Task();
task.setTaskType(100);
task.setPriority(50);
task.setParameters("task test".getBytes());
// 当下时间
task.setExecuteTime(new Date().getTime());
// 未来时间
// task.setExecuteTime(new Date().getTime() + 500);
// 未来时间(超过五分钟)
// task.setExecuteTime(new Date().getTime() + 500000);
long taskId = taskService.addTask(task);
System.out.println(taskId);
}
}
②. 未来时间
编辑 heima-leadnews-service/heima-leadnews-schedule/src/test/java/com/heima/schedule/service/impl/TaskServiceImplTest.java
文件:
// 当下时间
// task.setExecuteTime(new Date().getTime());
// 未来时间
task.setExecuteTime(new Date().getTime() + 500);
③. 未来时间(超过五分钟)
编辑 heima-leadnews-service/heima-leadnews-schedule/src/test/java/com/heima/schedule/service/impl/TaskServiceImplTest.java
文件:
// 未来时间
// task.setExecuteTime(new Date().getTime() + 500);
// 未来时间(超过五分钟)
task.setExecuteTime(new Date().getTime() + 500000);
三、延迟对列服务
1. 取消任务
⑴. 需求分析
场景:第三接口网络不通,使用延迟任务进行重试,当达到阈值以后,取消任务
- 根据taskid删除任务,修改任务日志状态为
- (取消) 删除redis中对应的任务数据,包括list和zset
⑵. Service
编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/service/TaskService.java
文件:
/**
* 取消任务
* @param taskId
* @return
*/
public boolean cancelTask(Long taskId);
⑶. ServiceImpl
编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/service/impl/TaskServiceImpl.java
文件:
/**
* 添加延迟任务
* @param task
* @return
*/
@Override
public long addTask(Task task) {
//1.添加任务到数据库中
boolean success = addTaskToDb(task);
if(success) {
//2.添加任务到redis
addTaskToCache(task);
}
return task.getTaskId();
}
/**
* 取消任务
* @param taskId
* @return
*/
@Override
public boolean cancelTask(long taskId) {
boolean flag = false;
// 1. 删除任务, 更新任务日志
Task task = updateDb(taskId, ScheduleConstants.CANCELLED);
// 2. 删除 redis 数据
if(task != null) {
flag = true;
removeTaskFromCache(task);
}
return flag;
}
⑷. 测试类
1613872161971040257 任务的发布时间 小于当前时间,并且 taskinfo表、taskinfoLogs表、Redis 均存在数据
编辑 heima-leadnews-service/heima-leadnews-schedule/src/test/java/com/heima/schedule/service/impl/TaskServiceImplTest.java
文件:
@Test
public void cancelTask() {
taskService.cancelTask(1613872161971040257L);
}
状态、版本已修改,并且 taskinfo表、Redis 中的数据已删除
2. 消费任务/拉取任务
⑴. 需求分析
⑵. Service
编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/service/TaskService.java
文件:
/**
* 按照类型和优先级来拉取任务
* @param type 类型
* @param priority 优先级
* @return
*/
public Task taskPool(int type, int priority);
⑶. ServiceImpl
编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/service/impl/TaskServiceImpl.java
文件:
/**
* 删除任务, 更新任务日志
* @param taskId
* @param status
* @return
*/
private Task updateDb(long taskId, int status) {
Task task = null;
try {
// 删除任务
taskinfoMapper.deleteById(taskId);
// 更新任务日志
TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
taskinfoLogs.setStatus(status);
taskinfoLogsMapper.updateById(taskinfoLogs);
task = new Task();
BeanUtils.copyProperties(taskinfoLogs, task);
task.setExecuteTime(new Date().getTime());
} catch (Exception e) {
log.error("task cancel exception taskId={}", taskId);
}
return task;
}
⑷. 测试类
①. 初始数据
②. 测试类
编辑 heima-leadnews-service/heima-leadnews-schedule/src/test/java/com/heima/schedule/service/impl/TaskServiceImplTest.java
文件:
@Test
public void taskPool() {
Task task = taskService.taskPool(100, 50);
System.out.println(task);
}
③. Result
状态、版本已修改,并且 taskinfo表、Redis 中的数据已删除
3. 定时刷新
⑴. reids key值匹配
①. keys 模糊匹配
keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞
②. scan
SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。
③. 测试类
编辑 heima-leadnews-service/heima-leadnews-schedule/src/test/java/com/heima/schedule/service/impl/TaskServiceImplTest.java
文件:
@Test
public void testPiple1(){
long start =System.currentTimeMillis();
for (int i = 0; i <10000 ; i++) {
Task task = new Task();
task.setTaskType(1001);
task.setPriority(1);
task.setExecuteTime(new Date().getTime());
cacheService.lLeftPush("1001_1", JSON.toJSONString(task));
}
System.out.println("耗时"+(System.currentTimeMillis()- start));
// -> 耗时36104
}
@Test
public void testPiple2(){
long start = System.currentTimeMillis();
//使用管道技术
List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() {
@Nullable
@Override
public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
for (int i = 0; i <10000 ; i++) {
Task task = new Task();
task.setTaskType(1001);
task.setPriority(1);
task.setExecuteTime(new Date().getTime());
redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes());
}
return null;
}
});
System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒");
// -> 使用管道技术执行10000次自增操作共耗时:1383毫秒
}
⑵. ServiceImpl
编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/service/impl/TaskServiceImpl.java
文件:
/**
* 未来数据定时刷新
*/
@Scheduled(cron = "0 */1 * * * ?")
public void refreshTask() {
System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");
// 1. 获取所有未来数据集合的key值
Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*"); // future_*
for (String futureKey : futureKeys) {
// 2. 获取当前数据的key topic
String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
// 3. 获取该组key下当前需要消费的任务数据
Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
// 4. 将这些任务数据添加到消费者队列中
if (!tasks.isEmpty()) {
cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
}
}
}
⑶. 启动类
编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/ScheduleApplication.java
文件:
@EnableScheduling
⑷. 测试类
①. 初始数据
编辑 heima-leadnews-service/heima-leadnews-schedule/src/test/java/com/heima/schedule/service/impl/TaskServiceImplTest.java
文件:
@Test
public void addTaskSet() {
for (int i = 0; i < 5; i++) {
Task task = new Task();
task.setTaskType(100 + i);
task.setPriority(50);
task.setParameters("task test".getBytes());
task.setExecuteTime(new Date().getTime() + 500 + i);
long taskId = taskService.addTask(task);
}
}
②. 定时刷新
启动 ScheduleApplication
服务:
4. redis分布式锁
⑴. 需求分析
分布式锁的解决方案:
方案 | 说明 |
---|---|
数据库 | 基于表的唯一索引 |
zookeeper | 根据zookeeper中的临时有序节点排序 |
redis | 使用SETNX命令完成 |
sexnx (SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。
这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作
- 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功
- 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败
- 客户端A执行代码完成,删除锁
- 客户端B在等待一段时间后再去请求设置key的值,设置成功
- 客户端B执行代码完成,删除锁
⑵. 存在的问题
如果存在 两个服务 的时候,服务 启动后 ,两个服务都会去执行 refresh定时任务 方法
①. 服务配置
编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/resources/bootstrap.yml
文件:
server:
port: ${server.port:51701}
②. 创建服务
③. 启动服务
⑶. 工具类
编辑 heima-leadnews-common/src/main/java/com/heima/common/redis/CacheService.java
文件:
/**
* 加锁
*
* @param name
* @param expire
* @return
*/
public String tryLock(String name, long expire) {
name = name + "_lock";
String token = UUID.randomUUID().toString();
RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
RedisConnection conn = factory.getConnection();
try {
//参考redis命令:
//set key value [EX seconds] [PX milliseconds] [NX|XX]
Boolean result = conn.set(
name.getBytes(),
token.getBytes(),
Expiration.from(expire, TimeUnit.MILLISECONDS),
RedisStringCommands.SetOption.SET_IF_ABSENT //NX
);
if (result != null && result)
return token;
} finally {
RedisConnectionUtils.releaseConnection(conn, factory,false);
}
return null;
}
⑷. 定时刷新加锁
编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/service/impl/TaskServiceImpl.java
文件:
/**
* 未来数据定时刷新
*/
@Scheduled(cron = "0 */1 * * * ?")
public void refreshTask() {
// redis 分布式锁
String token = cacheService.tryLock("FUTURE_TASK_ASYNC", 1000 * 30);
if(StringUtils.isNotBlank(token)) {
⑸. 测试
启动 51701
、 51702
两个服务后,只有一个服务会执行 定时刷新
5. 数据库同步到redis
⑴. 需求分析
⑵. 初始数据
执行 TaskServiceImplTest
的 addTaskSet 方法:
⑶. 同步至redis
编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/service/impl/TaskServiceImpl.java
文件:
/**
* 同步未来数据至redis
*/
@Scheduled(cron = "0 */5 * * * ?")
@PostConstruct // 初始执行
public void reloadData() {
// 1. 清空缓存数据
clearCache();
// 2. 查询符合条件的任务 五分钟以内的数据
// 2.1 获取五分钟之后的时间 毫秒值
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, 5);
// 2.2 查询条件
List<Taskinfo> taskinfos = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime, calendar.getTime()));
// 3. 把任务添加至redis
if(taskinfos != null && taskinfos.size() > 0) {
for (Taskinfo taskinfo : taskinfos) {
Task task = new Task();
BeanUtils.copyProperties(taskinfo, task);
task.setExecuteTime(taskinfo.getExecuteTime().getTime());
addTaskToCache(task);
}
}
System.out.println("数据库的任务同步到了redis");
}
/**
* 清理缓存中的数据
*/
public void clearCache() {
Set<String> topicScan = cacheService.scan(ScheduleConstants.TOPIC + "*");
Set<String> futureScan = cacheService.scan(ScheduleConstants.FUTURE + "*");
cacheService.delete(topicScan);
cacheService.delete(futureScan);
}
⑷. 测试
删除redis中的部分数据,启动 Schedule
服务:
6. 延迟对列接口定义
⑴. 提供对外接口
新建 heima-leadnews-feign-api/src/main/java/com/heima/apis/schedule/IScheduleClient.java
文件:
@FeignClient("leadnews-schedule")
public interface IScheduleClient {
/**
* 添加延时任务
* @param task
* @return
*/
@PostMapping("app/v1/task/add")
public ResponseResult addTask(@RequestBody Task task);
/**
* 取消任务
* @param taskId
* @return
*/
@GetMapping("app/v1/task/cancel/{taskId}")
public ResponseResult cancelTask(@PathVariable("taskId") long taskId);
/**
* 按照类型和优先级来拉取任务
* @param type 类型
* @param priority 优先级
* @return
*/
@GetMapping("app/v1/task/pool/{type}/{priority}")
public ResponseResult taskPool(@PathVariable("type") int type, @PathVariable("priority") int priority);
}
⑵. 微服务实现接口
新建 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/feign/ScheduleClient.java
文件:
@RestController
public class ScheduleClient implements IScheduleClient {
@Autowired
private TaskService taskService;
/**
* 添加延时任务
* @param task
* @return
*/
@Override
@PostMapping("app/v1/task/add")
public ResponseResult addTask(@RequestBody Task task) {
return ResponseResult.okResult(taskService.addTask(task));
}
/**
* 取消任务
* @param taskId
* @return
*/
@Override
@GetMapping("app/v1/cancel/{taskId}")
public ResponseResult cancelTask(@PathVariable("taskId") long taskId) {
return ResponseResult.okResult(taskService.cancelTask(taskId));
}
/**
* 按照类型和优先级来拉取任务
* @param type 类型
* @param priority 优先级
* @return
*/
@Override
@GetMapping("app/v1/task/pool/{type}/{priority}")
public ResponseResult taskPool(@PathVariable("type") int type, @PathVariable("priority") int priority) {
return ResponseResult.okResult(taskService.taskPool(type, priority));
}
}
7. 添加延迟任务
⑴. 序列化
①. 序列化工具对比
- JdkSerialize: java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化, ObjectOutputStream的writeObject()方法可序列化对象生成字节数组
- Protostuff: google开源的protostuff采用更为紧凑的二进制数组,表现更加优异,然后使用protostuff的编译工具生成pojo类
②. pom依赖
编辑 heima-leadnews-utils/pom.xml
:
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.6.0</version>
</dependency>
③. JdkSerialize
编辑 heima-leadnews-utils/pom.xml
:
/**
* jdk序列化
*/
public class JdkSerializeUtil {
/**
* 序列化
* @param obj
* @param <T>
* @return
*/
public static <T> byte[] serialize(T obj) {
if (obj == null){
throw new NullPointerException();
}
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
return bos.toByteArray();
} catch (Exception ex) {
ex.printStackTrace();
}
return new byte[0];
}
/**
* 反序列化
* @param data
* @param clazz
* @param <T>
* @return
*/
public static <T> T deserialize(byte[] data, Class<T> clazz) {
ByteArrayInputStream bis = new ByteArrayInputStream(data);
try {
ObjectInputStream ois = new ObjectInputStream(bis);
T obj = (T)ois.readObject();
return obj;
} catch (Exception ex) {
ex.printStackTrace();
}
return null;
}
}
④. Protostuff
编辑 heima-leadnews-utils/pom.xml
:
public class ProtostuffUtil {
/**
* 序列化
* @param t
* @param <T>
* @return
*/
public static <T> byte[] serialize(T t){
Schema schema = RuntimeSchema.getSchema(t.getClass());
return ProtostuffIOUtil.toByteArray(t,schema,
LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE));
}
/**
* 反序列化
* @param bytes
* @param c
* @param <T>
* @return
*/
public static <T> T deserialize(byte []bytes,Class<T> c) {
T t = null;
try {
t = c.newInstance();
Schema schema = RuntimeSchema.getSchema(t.getClass());
ProtostuffIOUtil.mergeFrom(bytes,t,schema);
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
return t;
}
/**
* jdk序列化与protostuff序列化对比
* @param args
*/
public static void main(String[] args) {
long start =System.currentTimeMillis();
for (int i = 0; i <1000000 ; i++) {
WmNews wmNews =new WmNews();
JdkSerializeUtil.serialize(wmNews);
}
System.out.println(" jdk 花费 "+(System.currentTimeMillis()-start));
// -> jdk 花费 2017
start =System.currentTimeMillis();
for (int i = 0; i <1000000 ; i++) {
WmNews wmNews =new WmNews();
ProtostuffUtil.serialize(wmNews);
}
System.out.println(" protostuff 花费 "+(System.currentTimeMillis()-start));
// -> protostuff 花费 292
}
}
⑤. 测试
执行 ProtostuffUtil
的 main 方法:
jdk 花费 2017
protostuff 花费 292
# protostuff 性能更强
⑵. 枚举
新建 heima-leadnews-model/src/main/java/com/heima/model/common/enums/TaskTypeEnum.java
文件:
@Getter
@AllArgsConstructor
public enum TaskTypeEnum {
NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
REMOTEERROR(1002, 2,"第三方接口调用失败,重试");
private final int taskType; //对应具体业务
private final int priority; //业务不同级别
private final String desc; //描述信息
}
⑶. Service
新建 heima-leadnews-service/heima-leadnews-wemedia/src/main/java/com/heima/wemedia/service/WmNewsTaskService.java
文件:
public interface WmNewsTaskService {
/**
* 添加任务到延迟对列中
* @param id 文章id
* @param publishTime 发布的时间 可以做为任务的执行时间
*/
public void addNewsToTask(Integer id, Date publishTime);
}
⑷. ServiceImpl
新建 heima-leadnews-service/heima-leadnews-wemedia/src/main/java/com/heima/wemedia/service/impl/WmNewsTaskServiceImpl.java
文件:
@Service
@Slf4j
public class WmNewsTaskServiceImpl implements WmNewsTaskService {
@Autowired
private IScheduleClient scheduleClient;
/**
* 添加任务到延迟对列中
* @param id 文章id
* @param publishTime 发布的时间 可以做为任务的执行时间
*/
@Override
@Async
public void addNewsToTask(Integer id, Date publishTime) {
log.info("添加任务到延迟服务中----begin");
Task task = new Task();
task.setExecuteTime(publishTime.getTime());
task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
WmNews wmNews = new WmNews();
wmNews.setId(id);
task.setParameters(ProtostuffUtil.serialize(wmNews));
scheduleClient.addTask(task);
log.info("添加任务到延迟服务中----end");
}
}
⑸. 调用延迟任务
编辑 heima-leadnews-service/heima-leadnews-wemedia/src/main/java/com/heima/wemedia/service/impl/WmNewsServiceImpl.java
文件:
@Autowired
private WmNewsTaskService wmNewsTaskService;
/**
* 发布/修改文章或发布为草稿
* @param dto
* @return
*/
@Override
public ResponseResult submitNews(WmNewsDto dto) {
// 1. 参数校验
if(dto == null || dto.getContent() == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
// 2. 保存或修改文章
WmNews wmNews = new WmNews();
// 属性拷贝 属性名词和类型相同才能拷贝
BeanUtils.copyProperties(dto, wmNews);
// 封面图片 list---> string
if(dto.getImages() != null && dto.getImages().size() > 0) {
String imageStr = StringUtils.join(dto.getImages(), ",");
wmNews.setImages(imageStr);
}
// 如果当前封面为自动类型 -1
if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)) {
wmNews.setType(null);
}
saveOrUpdateWmNews(wmNews);
// 3. 判断是否为草稿, 如果是草稿, 结束当前方法
if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())) {
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
// 4. 保存文章内容图片和素材的关系
// 提取文章内容中的图片信息
List<String> materials = ectractUrlInfo(dto.getContent());
saveRelativeInfoForContent(materials, wmNews.getId());
// 5. 保存文章封面图片和素材的关系, 如果当然布局是自动, 需要匹配封面图片
saveRelativeInfoForCover(dto, wmNews, materials);
// 审核文章
// wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
wmNewsTaskService.addNewsToTask(wmNews.getId(), wmNews.getPublishTime());
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
⑹. 测试
启动 WemediaApplication
、 WemediaGatewayAplication
、 ScheduleApplication
、 Nginx,添加文章,查看库表及redis
8. 消费任务、审核文章
⑴. Service
编辑 heima-leadnews-service/heima-leadnews-wemedia/src/main/java/com/heima/wemedia/service/WmNewsTaskService.java
文件:
/**
* 消费任务, 审核文章
*/
public void scanNewsByTask();
⑵. ServiceImpl
编辑 heima-leadnews-service/heima-leadnews-wemedia/src/main/java/com/heima/wemedia/service/impl/WmNewsTaskServiceImpl.java
文件:
@Autowired
private WmNewsAutoScanService wmNewsAutoScanService;
/**
* 消费任务, 审核文章
*/
@Scheduled(fixedRate = 1000)
@Override
public void scanNewsByTask() {
log.info("消费任务, 审核文章");
ResponseResult responseResult = scheduleClient.taskPool(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
if(responseResult.getCode().equals(200) && responseResult.getData() != null) {
Task task = JSON.parseObject(JSON.toJSONString(responseResult.getData()), Task.class);
WmNews wmNews = ProtostuffUtil.deserialize(task.getParameters(), WmNews.class);
wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
}
}
⑶. 引导类
编辑 heima-leadnews-service/heima-leadnews-wemedia/src/main/java/com/heima/wemedia/WemediaApplication.java
文件:
@EnableScheduling // 开启调度任务
⑷. 测试
启动 WemediaApplication
、 WemediaGatewayAplication
、 ScheduleApplication
、 ArticleApplication
、 Nginx,添加文章,文章审核状态(自动审核)
添加未来时间, 查看文章审核状态(是否未来时间自动审核)