day04-高并发优化
方案选择
实现了学习计划和学习进度的统计功能。特别是学习进度部分,为了更精确的记录用户上一次播放的进度,我们采用的方案是:前端每隔15秒就发起一次请求,将播放记录写入数据库。
在并发较高的情况下,会给数据库带来非常大的压力
在机器性能一定的情况下,提高单机并发能力就是要尽可能缩短业务的响应时间(ResponseTime),而对响应时间影响最大的往往是对数据库的操作。而从数据库角度来说,我们的业务无非就是读或写两种类型。
对于读多写少的业务,其优化手段大家都比较熟悉了,主要包括两方面:
- 优化代码和SQL
- 添加缓存【如Redis内存级别缓存】
对于写多读少的业务,大家可能较少碰到,优化的手段可能也不太熟悉,这也是我们要讲解的重点。
对于高并发写的优化方案有:
- 优化代码及SQL
- 变同步写为异步写
- 合并写请求:先写到Redis在写到DB,中间的不必要保存的都不写入DB,只将Redis最后一次保存到DB. 如前端每隔15秒就发起一次请求:视频播到第15秒更新moment到15、播到第30秒更新moment到30,播到第45秒更新moment到45,但我们要的是最后用户播放到的moment
-
变同步为异步:
不只是MQ一种方式,很多
合并写请求:【因为我们目的就是降低数据库的操作次数 所以采用这种方案】
写数据到缓存可以出现覆盖
方案
优化位置:
Redis要存的东西:User_id, 课程id 学习到的时间。因为User_id+课程id就是课表ID,所以存LessionID
操作redis的目的,就是更新哪一个视频哪一个小节下面学习了多少时长
public void writeRecordCache(LearningRecord record) {
log.debug("更新学习记录的缓存数据");
try {
// 1.Hashvalue是JSON形式
String json = JsonUtils.toJsonStr(new RecordCacheData(record));
// 2.拼装Hash的key
String key = StringUtils.format(RECORD_KEY_TEMPLATE, record.getLessonId());
// 写入redis
redisTemplate.opsForHash().put(key, record.getSectionId().toString(), json);
// 3.添加缓存过期时间:1分钟
//因为延迟任务是20s执行的,在这之前都持久化到DB了
redisTemplate.expire(key, Duration.ofMinutes(1));
} catch (Exception e) {
log.error("更新学习记录缓存异常", e);
}
}
Redis存的HashValue那个类,转成JSON的
@Data
@NoArgsConstructor
private static class RecordCacheData {
private Long id;
private Integer moment;
private Boolean finished;
public RecordCacheData(LearningRecord record) {
this.id = record.getId();
this.moment = record.getMoment();
this.finished = record.getFinished();
}
}
新改动的地方:
但是少了将moment更新到课表
如何将更新的moment写入DB:
定时任务: 执行时间间隔如果是1分钟的话,当redis中moment更新后的1min才会执行,我们需求用户续播时差不超过30s. <30s执行一次这边数据库压力也很大
所以采用异步延迟任务 :比较moment是否一致,不一致:Redis中的moment又变了说明还在学习,不用更新到DB
延迟任务
DelayQueue:
这里我们是直接同一个线程来执行任务了。当没有任务的时候线程会被阻塞。而在实际开发中,我们会准备线程池,开启多个线程来执行队列中的任务。
代码改造
工具类
我们的缓存工具类就应该具备上述4个方法:
- ① 添加播放记录到Redis,并添加一个延迟检测任务到DelayQueue
- ② 查询Redis缓存中的指定小节的播放记录
- ③ 删除Redis缓存中的指定小节的播放记录
- ④ 异步执行DelayQueue中的延迟检测任务,检测播放进度是否变化,如果无变化则写入数据库
/**
* 添加指定学习记录到redis,并提交延迟任务到延迟队列DelayQueue
*
* @param record 学习记录信息
*/
public void addLearningRecordTask(LearningRecord record) {
// 1.添加数据到Redis缓存
writeRecordCache(record);
// 2.提交延迟任务到延迟队列 DelayQueue
queue.add(new DelayTask<>(new RecordTaskData(record), Duration.ofSeconds(20)));//Duration延迟时间
}
RecordTaskData用于延迟任务比对moment:
@Data
@NoArgsConstructor
private static class RecordTaskData {
private Long lessonId;
private Long sectionId;
private Integer moment;
public RecordTaskData(LearningRecord record) {
this.lessonId = record.getLessonId();
this.sectionId = record.getSectionId();
this.moment = record.getMoment();
}
}
CompletableFuture.runAsync(this::handleDelayTask) 异步并发的一个类,项目启动init()这个线程,会新开一个线程执行handleDelayTask()
// volatile关键字:在多线程环境中,当一个线程修改了这个变量的值,其他线程能够立即看到最新的值。
private static volatile boolean begin = true;
// 线程池
private static ExecutorService executor = null;
// 项目启动后,当前类实例化创建对象 属性注入值后 该方法会运行,一般用于初始化工作
@PostConstruct
public void init() {
log.info("init方法执行了");
// 核心线程数等于CPU核心数
Integer corePoolSize = Runtime.getRuntime().availableProcessors();
// 创建线程池
executor = Executors.newFixedThreadPool(corePoolSize);
CompletableFuture.runAsync(this::handleDelayTask);
// executor.execute(this::handleDelayTask);
}
// 项目销毁前后,关闭延迟队列
@PreDestroy
public void destroy() {
log.debug("关闭学习记录处理的延迟任务");
// 关闭线程池
executor.shutdown();
begin = false;
}
处理延时任务
private void handleDelayTask() {
while (begin) {
try {
// 1.从延迟队列尝试获取任务,poll:非阻塞方法,take非阻塞方法
DelayTask<RecordTaskData> task = queue.take();
executor.submit(()->{ // 线程池提取线程执行
RecordTaskData data = task.getData();
// 2.读取Redis缓存的学习记录
LearningRecord record = readRecordCache(data.getLessonId(), data.getSectionId());
log.debug("获取到要处理的播放记录任务,任务数据:{},缓存数据:{}", task.getData(), record);
if (record == null) {
return;
}
// 3.比较新提交的延迟任务的视频播放进度数值和redis缓存中的是否一致
if (!Objects.equals(data.getMoment(), record.getMoment())) {
// 4.如果不一致,播放进度在变化,无需持久化
return;
}
// 5.如果一致,证明用户离开了视频,需要持久化
// 5.1.更新学习记录
record.setFinished(null);
recordMapper.updateById(record);
// 5.2.更新课表
LearningLesson lesson = new LearningLesson();
lesson.setId(data.getLessonId());
lesson.setLatestSectionId(data.getSectionId());
lesson.setLatestLearnTime(LocalDateTime.now());
lessonService.updateById(lesson);
});
log.debug("准备持久化学习记录信息");
} catch (Exception e) {
log.error("处理播放记录任务发生异常", e);
}
}
}
学习记录流程改造
Step2: 查询指定学习记录是否已存在,先从redis中查询,redis没命中则从数据库中查询
private LearningRecord queryOldRecord(Long lessonId, Long sectionId) {
// 查询redis缓存
LearningRecord cacheRecord = taskHandler.readRecordCache(lessonId, sectionId);
// redis缓存命中,直接返回
if (cacheRecord != null) {
return cacheRecord;
}
// redis缓存未命中,查询数据库
LearningRecord dbRecord = this.lambdaQuery().eq(LearningRecord::getLessonId, lessonId)
.eq(LearningRecord::getSectionId, sectionId).one();
// 数据库查询结果为null,表示记录不存在,需要新增学习记录,返回null即可
if (dbRecord == null) {
return null;
}
// 数据库查询结果写入redis缓存
taskHandler.writeRecordCache(dbRecord);
return dbRecord;
}
Step1+4:
// 判断本小节是否是首次完成:之前未完成且视频播放进度大于50%
boolean isFinished = !oldRecord.getFinished() && dto.getMoment() * 2 >= dto.getDuration();
// 更新学习记录,根据学习记录LearningRecord主键id进行匹配
if (!isFinished) {
LearningRecord record = LearningRecord.builder()
.id(oldRecord.getId())
.lessonId(dto.getLessonId())
.sectionId(dto.getSectionId())
.finished(oldRecord.getFinished())
.moment(dto.getMoment())
.build();
// 添加指定学习记录到redis,并提交延迟任务到延迟队列DelayQueue
taskHandler.addLearningRecordTask(record);
// 返回,本小节未完成
return false;
}
清理redis相应record
taskHandler.cleanRecordCache(dto.getLessonId(), dto.getSectionId());
首次完成视频播放,可以增加积分,发送MQ消息实现
if (isFinished) {
// 发送MQ消息实现观看学习视频获取积分
rabbitMqHelper.send(MqConstants.Exchange.LEARNING_EXCHANGE,
MqConstants.Key.LEARN_SECTION,
SignInMessage.of(userId,10)); // 学习一个视频 + 10积分
}
线程池
Executors.newxxx()
没有指定队列的capacity可能会OOM(queue方的任务太多导致内存溢出),阿里规范禁用这种,用下面显式new ThreadPoolExecutor()
线程池的运行流程:
- 先看有无空闲的核心线程,若有执行任务,没有不是直接用临时线程,
- 而是把任务放到阻塞队列排队,核心线程从队列中取take;
- 当阻塞队列满了,才看有无可用临时线程;没有的话,创建临时线程,
- 若临时线程达到上限,拒绝。