使用Redission分布式锁与Kafka消息队列,实现学生抢课系统(高并发秒杀场景)。
目录
- 一、思路
- 1.为频繁访问的信息设置缓存
- (1)登陆
- (2)课程任务信息
- (3)用户抢课记录
- 2.消息队列和分布式锁
- (1)抢课消息队列
- (2)锁定缓存抢夺
- (3)数量缓存操作
- 二、具体流程
- 1.抢课任务设置
- 2.用户抢课
- 三、实体类表
- 四、核心代码
- 1.消费端
- 2.数量缓存操作逻辑
- 五、两种分布式锁:
- 1.基于redis命令的分布式锁
- a.加锁
- b.解锁
- c.局限性
- 2. redission分布式锁
- 参考文章
一、思路
1.为频繁访问的信息设置缓存
后台设置抢课任务,配置抢课时间范围。一般情况下,用户会在抢课时间开始前至抢课前期一段的时间集中访问系统。
(1)登陆
系统在登陆时会查询数据库返回用户信息,所以需要对登陆接口进行改造,将用户信息(比如姓名、年级、班级、联系方式等)保存至缓存中。当用户信息发生变更时,才将信息从缓存中清除。
(2)课程任务信息
用户登陆进入系统后,进入到抢课任务模块对应的抢课任务中,进行课程选择。在抢课任务期间,用户会为频繁的访问该模块信息(抢课任务、关联课程、课程库存等),后台在进行信息校验时也会用到以上数据。
(3)用户抢课记录
在抢课期间,用户抢课退课操作和查询比较频繁,可将该数据保存至缓存,方便查询。
2.消息队列和分布式锁
为Kafka的抢课消息队列设置了一个主题五个分区,可处理大量并发抢课消息的情况。然而在每个分区内消息是有序的;不同分区中的消息无序,会出现多个进程同时进行的情况,而多个进程必须以互斥的方式独占共享资源(课程库存)。
为保证每一课程库存操作的独占性,为课程库存设置了锁定缓存(LockKey)和数量缓存(StockKey)。
注意:当消息抢夺到锁定缓存时,才可对数量缓存进行扣除(-1)的操作。
(1)抢课消息队列
对用户抢课数据进行校验(是否符合年级、已选课程日期冲突炖、课程班级人数限制、课程库存余量等),校验通过后将请求数据发送至Kafka抢课消息队列。
(2)锁定缓存抢夺
此处使用了Redission分布式锁,当同时有多个用户发送同一课程的消息时,消费端接收到消息在5秒内尝试获取锁定缓存(LockKey),若获取成功加锁30秒,否则失效。
(3)数量缓存操作
获取锁定缓存(LockKey)成功后,查询数量缓存(StockKey)。此时需要对各项业务数据再次进行校验,因为在数据进入消息队列前进程仍是并发的,可能会出现数据已变动的情况。在满足抢课条件后,取出数量缓存(StockKey)库存进行数量减一的操作,操作完毕最终释放锁定缓存(LockKey)。
二、具体流程
1.抢课任务设置
(1)后台管理人员配置抢课任务信息(开始时间、结束时间、课程、课程时间等)。
(2)课程任务正式发布,保存信息(任务、关联课程、自定义课程可选人数等)到缓存;当任务取消发布时,需要将对应的缓存删除。
2.用户抢课
(1)首次登陆查询用户信息,并保存至缓存。
(2)进入抢课任务信息,查看可选课程列表。
(3)点击抢课按钮发送请求。
(4)选课成功:保存学生对应任务已选课程集合到缓存;更新任务课程库存数量等缓存信息。
(5)退课:更新学生已选课程缓存;清除任务课程库存数量缓存。
三、实体类表
此处列举部分核心数据库表设计。
1.课程表
id | 课程名称 | 课程编号 | 课程教室 | 课程简介 | 教师id | 教师名称 |
---|---|---|---|---|---|---|
1 | 舞蹈兴趣班 | C0001 | 1号楼6楼舞蹈教室 | 面向0基础学生 | 10001 | 王老师 |
2 | 画图兴趣班 | C0002 | 1号楼2楼美术教室 | 面向0基础学生 | 10002 | 李老师 |
3 | 音乐兴趣班 | C0003 | 3号楼1楼音乐教室 | 面向0基础学生 | 10003 | 陈老师 |
2.选课任务表
id | 任务名称 | 可选年级id集合 | 可选班级id集合 | 学生可选课程总数 | 开始时间 | 结束时间 | 发布状态 | 发布时间 | 任务状态 |
---|---|---|---|---|---|---|---|---|---|
1 | 2023年秋季选课 | 2019级id,2020级id | 2020级-1班id,2020级-2班id,2019级-3班id | 2 | 2023-08-01 09:00:00 | 2023-08-07 20:00:00 | 已发布 | 2023-07-20 09:00:00 | 已结束 |
2 | 2024年春季选课 | 2019级id,2020级id | 3 | 2024-01-30 09:00:00 | 2024-01-07 20:00:00 | 已发布 | 2024-01-20 09:00:00 | 进行中 |
3.选课任务课程关联表
id | 任务id | 课程id | 课程可选人数 | 开始日期 | 结束日期 | 课程表json |
---|---|---|---|---|---|---|
1 | 1 | 1 | 50 | 2023-09-20 | 2023-12-30 | [{“name”:“周一”, “section”:[{“name”:“第5节”,“state”:“1”}]},{“name”:“周二”, “section”:[{“name”:“第3节”,“state”:“1”}]}] |
2 | 1 | 2 | 50 | 2023-09-20 | 2023-12-30 | [{“name”:“周二”, “section”:[{“name”:“第3节”,“state”:“1”}]},{“name”:“周四”, “section”:[{“name”:“第5节”,“state”:“1”}]}] |
3 | 1 | 3 | 50 | 2023-09-20 | 2023-12-30 | [{“name”:“周三”, “section”:[{“name”:“第5节”,“state”:“1”}]},{“name”:“周五”, “section”:[{“name”:“第5节”,“state”:“1”}]}] |
4 | 2 | 1 | 50 | 2024-03-01 | 2024-05-30 | [{“name”:“周一”, “section”:[{“name”:“第5节”,“state”:“1”}]},{“name”:“周二”, “section”:[{“name”:“第3节”,“state”:“1”}]}] |
5 | 2 | 2 | 50 | 2024-03-01 | 2024-05-30 | [{“name”:“周二”, “section”:[{“name”:“第3节”,“state”:“1”}]},{“name”:“周四”, “section”:[{“name”:“第5节”,“state”:“1”}]}] |
6 | 3 | 3 | 50 | 2024-03-01 | 2024-05-30 | [{“name”:“周三”, “section”:[{“name”:“第5节”,“state”:“1”}]},{“name”:“周五”, “section”:[{“name”:“第5节”,“state”:“1”}]}] |
4.学生选课关联表
id | 任务id | 课程id | 学生id | 选课状态 | 老师帮选表示 | 选课时间 |
---|---|---|---|---|---|---|
1 | 1 | 1 | 1 | 已选课 | 是 | 2023-08-01 09:01:00 |
2 | 1 | 1 | 2 | 已取消 | 否 | 2023-08-01 09:01:01 |
四、核心代码
1.消费端
(1)Kafka配置:主题、分区初始化
(2)用户抢课数据初步通过校验,发送到消息队列中的处理逻辑。
@Component
@Slf4j
public class KafkaConsumer {
/**
* 初始化学生选课主题分区 5个
* 通过注入一个 NewTopic 类型的 Bean 来创建 topic,如果 topic 已存在,则会忽略。
*/
@Bean
public NewTopic courseSelectionBatchTopic() {
log.info("创建学生选课主题courseSelectionBatchTopic : szxy_oa_course_selection_student_add_topic,分区:5,副本数:1 >>>>>>>>>>>>>>>>>>>>>>>>>>>>> ");
NewTopic newTopic = new NewTopic(OaConstant.COURSE_SELECTION_STUDENT_ADD_TOPIC, 5, (short) 1);
log.info("newTopic:topicName:{},分区: {} >>>>>>>>>>>>>>>>>>>>>>>>>>>> ", newTopic.name(), newTopic.numPartitions());
return newTopic;
}
/**
*添加学生选课主题消息
*/
@KafkaListener(topics = OaConstant.COURSE_SELECTION_STUDENT_ADD_TOPIC,groupId = KafkaProducer.TOPIC_GROUP)
public void courseSelectionStudentAddMsg(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("COURSE_SELECTION_STUDENT_ADD_TOPIC-学生选课队列消费端 topic:{}, 收到消息>>>>>>>>>>>>>>>>>", topic);
try {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
// 先判断消息是否已经已经消费过,5s
String fullKey2 = redisLockUtil.getFullKey(COURSE_SELECTION_STUDENT_CONSUME_LOCK_PREFIX , String.valueOf(msg));
if(redisLockUtil.getLock(fullKey2 , 5000)){
// 获得学生选课入参
CourseSelectionStudentParam param = objectMapper.readValue(String.valueOf(msg), CourseSelectionStudentParam.class);
// 选课任务课程 key 任务id + 任务课程id
Long taskId = param.getTaskId(); // 选课任务id
Long taskcourseId = param.getTaskcourseId(); // 选课任务课程id
String key = taskId + "::" + taskcourseId;
// 获得课程容量库存锁:任务id + 任务课程id,才可以操作库存缓存
String fullKey = redisLockUtil.getFullKey(COURSE_SELECTION_STUDENT_LOCK_PREFIX, key);
final RLock lock = redissonClient.getLock(fullKey);
// 尝试抢【课程容量库存锁】锁时间调整 5s
boolean bool = lock.tryLock(5, 30, TimeUnit.SECONDS); // 5s内尝试加锁,加锁成功后30s失效
if (bool) {
tCourseSelectionStudentService.handleCourseSelectionStudent(param, lock);
log.info("courseSelectionStudentAddMsg 取得更新库存锁,消费了: Topic:" + topic + ",Message:" + String.valueOf(msg));
}
}else {
log.info("courseSelectionStudentAddMsg 已经被消费: Topic:" + topic + ",Message:" + String.valueOf(msg));
}
}
} catch (Exception e) {
e.printStackTrace();
log.error("解析 <"+OaConstant.COURSE_SELECTION_STUDENT_ADD_MESSAGE_KAFKA_TOPIC+"> 数据异常");
} finally {
ack.acknowledge();
}
log.info("COURSE_SELECTION_STUDENT_ADD_TOPIC-学生选课队列消费端 消费结束 >>>>>>>>>>>>>>>>>");
}
}
2.数量缓存操作逻辑
通过Redission分布式锁抢夺锁定缓存后的处理逻辑。
/**
* 处理学生抢课消息
* 入库并修改库存缓存
* 编辑学生选课列表redis
* 通过验证后发送消费消息,需要再次校验redis是否已选课程
* 加入锁对象
*/
@Override
public void handleCourseSelectionStudent(CourseSelectionStudentParam param, RLock lock) {
Date current = new Date();
// 库存余量判断 -> 查询库存key
String stockKey = COURSE_SELECTION_STUDENT_STOCK_PREFIX + param.getTaskId() + "::" + param.getTaskcourseId();
String stockNumStr = redisTemplate.opsForValue().get(stockKey);
log.info("handleCourseSelectionStudent-学生抢课添加处理,开始>>>>>>>>>>>>>>>>>>>>>>>>>>> key={},stockNum={},stuIndentityId={}", stockKey, stockNumStr, param.getStuIdentityId());
if (StringUtils.isBlank(stockNumStr)) {
log.error("handleCourseSelectionStudent-学生抢课添加处理,未查询到库存, key={},stockNum={}", stockKey, stockNumStr);
// 释放锁
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
// 是当前执行线程的锁
lock.unlock();
}
return;
}
int num = Integer.parseInt(stockNumStr) - 1;
if (num < 0) {
log.error("handleCourseSelectionStudent-学生抢课添加处理,库存余量不足,key={},stockNum={},studentNum={}", stockKey, stockNumStr, 1);
// 释放锁
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
// 是当前执行线程的锁
lock.unlock();
}
return;
}
// 入库前再次校验是否已选该课程
// 查询对该课程的选课结果 redis key = taskcourseId + classId + stuIdentityId,防止多次重复点击选课
String couseSelectionStudentPkIdKey2 = CourseSelectionConstants.COURSE_SELECTION_CLASS_STUDENT_PKID_PREFIX + param.getTaskcourseId() + ":" + param.getClassId() + ":" + param.getStuIdentityId();
String courseSelectionPkId = redisTemplate.opsForValue().get(couseSelectionStudentPkIdKey2);
if (StringUtils.isNotBlank(courseSelectionPkId)) {
log.error("handleCourseSelectionStudent-学生抢课添加处理,已存在学生选课记录,key={},taskcourseId={},stuIndentityId={}", couseSelectionStudentPkIdKey2, param.getTaskcourseId(), param.getStuIdentityId());
// 释放锁
// redisLockUtil.delLock(redisLockUtil.getFullKey(COURSE_SELECTION_STUDENT_LOCK_PREFIX, param.getTaskId() + "::" + param.getTaskcourseId()));
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
// 是当前执行线程的锁
lock.unlock();
}
return;
}
// 入库前再次校验,所属班级是否有剩余人数
// 查询是否超过班级可选限制
if (!"0".equals(param.getClassLimit())) {
// 校验是否超过班级可选人数,班级可选人数配置从taskcourse中获取
int classSelectNum = 0;
String classLimitKey = CourseSelectionConstants.COURSE_SELECTION_TASKCOURSE_CLASSLIMIT_PREFIX + param.getOrgId() + ":" + param.getTaskId() + ":" + param.getTaskcourseId() + ":" + param.getClassId();
String classLimitStr = redisTemplate.opsForValue().get(classLimitKey);
if (StringUtils.isNotBlank(classLimitStr)) {
classSelectNum = Integer.parseInt(classLimitStr);
}
// 校验是否超过任务选课数量上限
// 查询学生当前任务的课程记录列表(已选 + 已取消) redis中获取
String stuCourseListKey3 = CourseSelectionConstants.COURSE_SELECTION_STUDENT_COURSELIST_PREFIX + param.getOrgId() + ":" + param.getTaskId() + ":" + param.getStuIdentityId();
String stuCourseStr3 = redisTemplate.opsForValue().get(stuCourseListKey3);
List<CourseSelectionTaskcourseVo> selectionCourseList = new ArrayList<>();
if (StringUtils.isNotBlank(stuCourseStr3)) {
selectionCourseList = JSONUtil.toList(JSONUtil.toJsonStr(stuCourseStr3), CourseSelectionTaskcourseVo.class);
if (CollectionUtil.isNotEmpty(selectionCourseList)) {
// 筛选出是已选状态的课程列表
selectionCourseList = selectionCourseList.stream().filter(c -> "1".equals(c.getSelectionStatus())).collect(Collectors.toList());
}
}
if (CollectionUtil.isNotEmpty(selectionCourseList)) {
// 筛选学生已选当前任务的课程数量、是否存在当前课程的选课记录、是否存在重复上课时间
int taskCourseNum = 0; // 学生在该任务中已选的任务数量
for (CourseSelectionTaskcourseVo taskcourseVo : selectionCourseList) {
String taskId = taskcourseVo.getTaskId();
if (taskId.equals(param.getTaskId().toString())) {
taskCourseNum ++;
}
}
if (taskCourseNum >= param.getMaxSelectNum()) {
log.error("handleCourseSelectionStudent-当前选课任务选课数量已达到上限,任务id:{},学生id:{}", param.getTaskId(), param.getStuIdentityId());
return;
}
}
// 添加选课信息
TCourseSelectionStudent selectionStudent = new TCourseSelectionStudentDTO();
BeanUtil.copyProperties(param, selectionStudent);
selectionStudent.setSelectionStatus(1); //1 选中
selectionStudent.setDelFlag(0);
selectionStudent.setCreatorId(UserHandle.getUserId()); // 学生id/教师id
selectionStudent.setCreateTime(new Date());
getBaseMapper().insert(selectionStudent);
// 修改redis缓存库存,释放锁
redisTemplate.opsForValue().decrement(stockKey, 1); // 库存量-1
// 添加选课结果 redis key = taskcourseId + classId + stuIdentityId,value = pkId,保存30天,退课时删除。
String couseSelectionStudentPkIdKey = CourseSelectionConstants.COURSE_SELECTION_CLASS_STUDENT_PKID_PREFIX + selectionStudent.getTaskcourseId() + ":" + selectionStudent.getClassId() + ":" + selectionStudent.getStuIdentityId();
redisTemplate.opsForValue().set(couseSelectionStudentPkIdKey, selectionStudent.getPkId().toString(), 30, TimeUnit.DAYS);
// 构造个人选课记录缓存
String stuCourseListKey = CourseSelectionConstants.COURSE_SELECTION_STUDENT_COURSELIST_PREFIX + selectionStudent.getOrgId() + ":" + selectionStudent.getTaskId() + ":" + selectionStudent.getStuIdentityId();
String stuCourseStr = redisTemplate.opsForValue().get(stuCourseListKey);
List<CourseSelectionTaskcourseVo> taskcourseList = new ArrayList<>();
if (StringUtils.isNotBlank(stuCourseStr)) {
taskcourseList = JSONUtil.toList(JSONUtil.toJsonStr(stuCourseStr), CourseSelectionTaskcourseVo.class);
}
// 构造学生选课记录列表对象
CourseSelectionTaskcourseVo courseVo = new CourseSelectionTaskcourseVo();
courseVo.setTaskId(param.getTaskId().toString());
courseVo.setPkId(param.getTaskcourseId().toString());
courseVo.setCourseName(param.getTaskcourseName());
courseVo.setCourseNo(param.getTaskcourseNo());
courseVo.setWeekSection(param.getWeekSection());
courseVo.setCourseNumber(param.getCourseNumber());
courseVo.setCourseSelectionStudentId(selectionStudent.getPkId().toString()); // 学生选课记录id
courseVo.setSelectionStatus("1"); //设置为1已选
courseVo.setTeacherSelection(param.getTeacherSelection().toString());
courseVo.setCreateTime(current);
taskcourseList.add(courseVo);
// 保存个人选课记录30天
redisTemplate.opsForValue().set(stuCourseListKey, JSONUtil.toJsonStr(taskcourseList), 30, TimeUnit.DAYS);
// 最后才释放锁
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
// 是当前执行线程的锁
lock.unlock();
}
}
五、两种分布式锁:
1.基于redis命令的分布式锁
a.加锁
1)setnx(lockKey, expireTime)
set if not exist,如果不存在就设置锁。原子方法,返回1代表成功存入锁。
2)get(lockKey)
获取值oldExpireTime,与当前系统时间比较,判断锁是否已超时。若已超时允许其他请求重新获取。
3)getset(lockKey, newValue)
返回当前锁的过期时间。如果与(2)的oldExpireTime相等,说明获取到了锁;否则失败。
b.解锁
delete(lockKey)
在锁定时间内完成操作,主动调用delete解锁
c.局限性
多服务器并发进行getset会出现过期时间覆盖问题。
锁不具备拥有者表示,任何客户端都可解锁。
不支持阻塞等待和重入。
2. redission分布式锁
(1)lua脚本:原子性执行加锁、解锁、广播解锁消息。
(2)可重入锁:通过redis的hash结构实现,内含一对键值对。锁名为hash的名称,UUID+线程ID作为hash的key,锁被重入的次数为value。
(3)等待锁:订阅解锁消息,获取解锁时间,阻塞待唤醒或者超时。
参考文章
Redis分布式锁-这一篇全了解(Redission实现分布式锁完美方案)
redis分布式锁RedissonLock的实现细节
Redis 分布式锁实现的一些方法 setnx()、get()、getset()