目录
使用Mysql
常规测试
张三测试
流程总结
redis优化
修改代码
测试
使用分布式锁
总结
使用Mysql
常规测试
原始代码:
@Override
@Transactional
public ResponseResult selectCourse(SelectParmas selectParmas) {
if (Objects.isNull(selectParmas)){
return new ResponseResult(500,"传入参数有误!");
}
//1. 检查课程限选人数是否已经超标
Course course = courseMapper.findByNo(selectParmas.getCno()+"");
//1.1 获取当前可选人数
int limitSelects = course.getLimitSelects();
//1.2 检查可选人数是否已经<1
if (limitSelects<1){
return new ResponseResult(500,"本课人数已满!");
}
//2. 检查是否有重复选课
//2.1 查询当前用户是否已经选了这门课
int isSelect = studentCourseMapper.findByUnoAndCno(selectParmas);
if (isSelect >0){
return new ResponseResult(500,"不能重复选课哦");
}
//3. 扣减课程表可选人数
//3.1 乐观锁扣减可选数
int cnt = courseMapper.subSelects(course.getCno());
if (cnt<1){
//说明扣减失败,直接返回
return new ResponseResult(500,"本课人数已满!");
}
//4. 插入选课信息表(传入用户id和课程的id)
studentCourseMapper.insert(selectParmas);
return new ResponseResult(200,"选课成功!");
}
以上代码,使用了乐观锁防止课程表可选人数出现超抢的情况
下面模拟1000个线程(用户)同时抢同一门课时,会出现的情况。
生成了1000个登录用户:
tokens文件生成也成功了
postman使用一个用户测试一下看看:
多点几下试试看
postman测试通过,现在用jmeter做压测:
-
1000个线程,1秒钟发起请求
-
设置请求体参数,选课id为1 用户为传进来的no(读取文件)
-
请求头,token 设置为传进来的token(读取文件)
-
开始测试
-
每个请求的平均值(ms)干到了2841
-
查看一下数据库
我又测试了几次,发现结果没有问题。说明以上代码(乐观锁)确实可以防止高并发情况下,出现超出限制人数的抢课情况。
张三测试
但是,凡是都有例外!
设想一下,假如某位比较坏的学生(张三),觉得自己懂一点技术很厉害,使用脚本去抢课,那么会出现啥情况。
这里我使用jmeter,用同一个用户并且模拟1000个线程去发起抢课。
结果:
张三这小子!居然一个人就抢占了10个名额!
回头看代码,找找问题。
@Override
@Transactional
public ResponseResult selectCourse(SelectParmas selectParmas) {
......
//XXXXXXXXXXXXX问题XXXXXXXXXXXXXX
//2. 检查是否有重复选课
//2.1 查询当前用户是否已经选了这门课
int isSelect = studentCourseMapper.findByUnoAndCno(selectParmas);
if (isSelect >0){
return new ResponseResult(500,"不能重复选课哦");
}
//XXXXXXXXXXXXX问题XXXXXXXXXXXXXX
//3. 扣减课程表可选人数
//3.1 乐观锁扣减可选数
int cnt = courseMapper.subSelects(course.getCno());
if (cnt<1){
//说明扣减失败,直接返回
return new ResponseResult(500,"本课人数已满!");
}
//4. 插入选课信息表(传入用户id和课程的id)
studentCourseMapper.insert(selectParmas);
return new ResponseResult(200,"选课成功!");
}
观察代码可以发现,在检查是否有重复选课这里,其实存在问题的。
一般的学生确实不会出现问题,但是遇上张三这样的学生就遭殃了。因为多个线程同时进来,假如线程A和线程B同时查询到此时选课表里面isSelect为0,那么他们就会同时往下执行,这样就出现了一人多选的情况。
解决:
-
从查询是否重复选课,到最后的插入选课表这里需要加锁。
-
这里我为了方便使用了synchornized锁,但是使用synchornized需要考虑一些问题:
-
问题1:锁的对象是谁?如果把后面的代码封装为一个方法,加锁加在方法体上面,那么锁住的是this,这样显然会大大降低服务的性能,因为锁的范围太大了。将锁的范围缩小,锁住当前用户即可。也就是锁住一个字符串对象,这个字符串对象可以拼接当前用户的id,以此来缩小锁的范围提升服务的性能。
-
问题2: 为什么要把后面这一段业务代码封装为一个方法,直接锁住代码段不行吗?答案是不行,因为我们需要考虑事务。事务的提交是在锁释放之后才提交的,也就是到最后return出去才会提交,但是如果锁释放了,事务没来得及提交,此时其他线程冲进来一顿操作,就破坏了我们前一次事务了。所以必须把后面的一段业务封装起来,将Transactional注解加在它的头上
-
问题3: 事务加在它的头上,此时还能用this来调用吗?不行,如果此时使用this调用,那么事务就会失效,所以我们要用代理对象来调用它。
-
问题4: 使用了synchornized,假如服务器做集群部署(水平拓展) 锁是会失效的。因为synchornized锁住了同一台JVM的监视器,如果做了水平拓展,还是可能出现同一用户同时持有多把锁的现象。解决的办法:后面可以改用redisson分布式锁来代替它,但是现在先用synchornized就行。
-
-
考虑完以上的一些问题,接下来就改写代码。
首先项目改一下,因为我们需要使用代理对象
添加依赖:
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
添加注解到启动类:暴露代理对象
@EnableAspectJAutoProxy(exposeProxy = true)
改造代码:
@Override
public ResponseResult selectCourse(SelectParmas selectParmas) {
if (Objects.isNull(selectParmas)){
return new ResponseResult(500,"传入参数有误!");
}
//1. 检查课程限选人数是否已经超标
Course course = courseMapper.findByNo(selectParmas.getCno()+"");
//1.1 获取当前可选人数
int limitSelects = course.getLimitSelects();
//1.2 检查可选人数是否已经<1
if (limitSelects<1){
return new ResponseResult(500,"本课人数已满!");
}
//拼接锁对象
String lockKey=LOCK_PREFIX+selectParmas.getUno();
//获取代理对象
CourseService proxy = (CourseService) AopContext.currentProxy();
//加锁
synchronized (lockKey.intern()){
return proxy.checkRebuildAndUpdate(selectParmas,course);
}
}
@Transactional
public ResponseResult checkRebuildAndUpdate(SelectParmas selectParmas, Course course) {
//2. 检查是否有重复选课
//2.1 查询当前用户是否已经选了这门课
int isSelect = studentCourseMapper.findByUnoAndCno(selectParmas);
if (isSelect >0){
return new ResponseResult(500,"不能重复选课哦");
}
//3. 扣减课程表可选人数
//3.1 乐观锁扣减可选数
int cnt = courseMapper.subSelects(course.getCno());
if (cnt<1){
//说明扣减失败,直接返回
return new ResponseResult(500,"本课人数已满!");
}
//4. 插入选课信息表(传入用户id和课程的id)
studentCourseMapper.insert(selectParmas);
return new ResponseResult(200,"选课成功!");
}
张三再次发起进攻(1000线程)
后台日志 事务有效:
数据库检查结果:
成功制裁张三。
流程总结
-
用户点击抢课
-
检查课程已选的人数是否超标
-
如果此时可选人数已经<1,那么直接返回。
-
否则,可以继续做下面的判断。
-
-
检查是否有重复选课
-
如果在选课表中查询到用户的信息,直接返回。
-
-
乐观锁扣减课程的可选人数
update course set limitSelects = limitSelects-1 where cno=#{cno} and limitSelects>0 -
插入选课表
redis优化
经过以上的修修改改,我们解决了高并发下超选和一人多选的问题。但是此时系统的性能如何呢?
再次做一次压测,因为为了制裁张三我们给业务加了锁,此时性能应该会下降很多。
平均值干到了3225ms,确实恐怖。难怪学校每次开发抢课的时候,次次服务器崩溃。而且我这还只是1000次,像是我们学校的抢课,抢讲座,并发量都是上万的。所以,优化是必须要优化的。
再次看看上面的流程:
-
检查课程已选的人数是否超标 (查询数据库)
-
检查是否有重复选课 (查询数据库)
-
乐观锁扣减课程的可选人数 (写数据库)
-
插入选课表 (写数据库)
分析:前两步操作都是查询数据库,后两步是写数据库。并且要命的是,这几步操作是同步的。
优化思路:
-
将前两步的查询操作,放到redis里去查询。
-
可以这样做的原因: 抢课活动不是经常进行的,我们可以在学校开放抢课活动的时候将需要被抢的课程信息存进redis里面。
-
-
将后两步写操作,改成异步,放入到消息队列。这里使用redis的stream数据类型作为消息队列来使用。
-
可以这样做的原因: 当我们做完上面的抢课资格检查之后,写数据操作事实上是可以异步的。并且由于前面已经做好了资格的检查,所以也不会出现并发问题。因为消息队列里面,最多也就是200条(具体看业务)待处理的任务需要进行。
-
修改代码
业务代码修改
-
检查课程已选的人数是否超标,可以提前在redis里面先存放课程的可选量
-
使用简单的string类型
-
键为课程id 值为当前的可选量
-
-
检查是否有重复选课
-
选课信息使用set类型
-
整个set的键为: 当前课程的id,值存放用户的id
-
以上两步归纳成选课资格的检查,学生获得选课资格之后,就可以将这个消息塞进消息队列里面丢给异步线程去执行,而此时也可以直接返回抢课成功的结果给前端了。
代码改写:
-
添加课程信息到redis
-
LUA脚本
-
为了进一步提升性能,我们可以将选课资格的检查,添加到消息队列这段业务写进LUA脚本,减少查询redis的次数,并且还能保证这段业务的原子性。
-
--- 参数列表
local courseId=ARGV[1]
local userId=ARGV[2]
--- 拼接课程信息key(可选人数信息)
local courseKey="SECKILL:COURSE:".. courseId
--- 拼接选课表key(选课成功的Set集合)
local setKey="SELECTED:COURSE:" .. courseId
--- 1.检查已选人数是否已经超标(是否可选已经<1)
if (tonumber(redis.call('GET',courseKey))<1) then
--- 1.1 如果已经<1 直接返回1
return 1
end
--- 2.检查当前用户是否存在重复抢课
if (redis.call('SISMEMBER',setKey,userId)>0) then
return 2
end
--- 3.扣减可选人数
redis.call('INCRBY',courseKey,-1)
--- 4.添加用户id进Set
redis.call('SADD',setKey,userId)
--- 5.添加到stream队列
redis.call('XADD','stream.course','*','cno',courseId,'uno',userId)
--- 6.返回0 表示有资格选课
return 0
-
在java里面,定义一个脚本
private final static DefaultRedisScript<Long>SCKILL_COURSE_SCRIPT;
static {
SCKILL_COURSE_SCRIPT=new DefaultRedisScript<>();
SCKILL_COURSE_SCRIPT.setLocation(new ClassPathResource("seckill_course.lua"));
SCKILL_COURSE_SCRIPT.setResultType(Long.class);
}
//代理对象,防止事务失效
private CourseService proxy;
-
改完之后的抢课业务变得很精简:
/*
抢课业务
*/
@Override
public ResponseResult selectCourse(SelectParmas selectParmas) {
if (Objects.isNull(selectParmas)){
return new ResponseResult(500,"传入参数有误!");
}
//1. 检查抢课资格
//1.1 执行LUA脚本
Long result = stringRedisTemplate.execute(
SCKILL_COURSE_SCRIPT,
Collections.emptyList(),
selectParmas.getCno().toString(),
selectParmas.getUno()
);
//1.2 检查执行结果
if (result!=0){
return new ResponseResult(500,result==1?"当前课程选课已满!":"一人只能选一次!");
}
proxy = (CourseService) AopContext.currentProxy();
//2 直接返回成功即可
return new ResponseResult(200,"抢课成功!");
}
异步线程执行消息队列中的任务
-
我们上面只是完成了抢课资格的检查,最后返回结果,但是实际上数据库里还没有做出修改。
-
由于我们已经在消息队列里面添加了课程id和用户id,所以此时我们要做的事情就是从消息队列取出未被消费的课程id和用户id,然后将他们写进数据库。这个过程是循环执行的。这里我们先定义一个任务类,在run方法里调用执行任务的方法。具体的代码在下面。
-
/* 需要处理的异步任务 */ private class SelectCourseTask implements Runnable{ private final String streamKey="stream.course"; @Override public void run() { //todo 执行构建订单的任务 handlerCourseTask(streamKey); } }
-
-
这里我使用线程池来执行这一任务,并且执行的时机是本类一加载就开始执行。于是使用到SpringBoot的PostConstruct注解。
-
private final static ExecutorService BUILD_COURSE_SELECTED_POOL= Executors.newFixedThreadPool(1); //单线程执行 @PostConstruct //本类一加载就开始执行任务 private void initTask(){ BUILD_COURSE_SELECTED_POOL.submit(new SelectCourseTask()); }
-
-
再来看看任务的具体业务代码应该怎么写:
-
首先这一段业务应该是一个死循环,因为我们需要不断从消息队列里取出消息来消费。
-
其次,我们需要考虑消息被消费了,但是没有被确认(ACK),此时这个消息对应的任务可能实际上没有被执行,所以我们需要在循环里面捕获异常,并且继续完成这个未被确认的消息的任务。
-
所以整段业务的流程是这样:
-
进入循环后,从消息队列中取出未被消费的消息(课程id,用户id)
-
将拿到的课程id和用户id去构建订单,并且扣减课程表的课程可选人数。注意这里应该封装成一段事务代码去提交。并且还要注意事务失效的问题,因为此时在提交事务的不是主线程而是子线程,而代理对象的获取是通过ThreadLocal来获取的。所以需要提前获取代理对象,这一步在前面已经做好了。 proxy = (CourseService) AopContext.currentProxy();
-
从消息队列中取出这个消息的id
-
最终确认这个消息被消费(ACK)
-
需要捕获异常,并且在里面继续执行pedding-list中的消息,这一段跟我们从消息队列里取出未被消费的消息事实上是一样的,只不过循环需要被终止,而且取出来的不是未被消费的消息而是未被确认的消息。
-
-
-
代码:
private void handlerCourseTask(String streamKey) {
while (true){
//死循环 一直执行消息队列里面的任务
try {
//读取最后一个被消费的
List<MapRecord<String, Object, Object>> recordList = stringRedisTemplate.opsForStream()
.read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofMillis(200)),
StreamOffset.create(streamKey, ReadOffset.lastConsumed())
);
if (recordList==null||recordList.isEmpty()){
//没有获取到消息,继续等待
continue;
}
MapRecord<String, Object, Object> record = recordList.get(0);
//获取消息,构建选课信息表
Map<Object, Object> mp = record.getValue();
String cno = (String) mp.get("cno");
String uno = (String) mp.get("uno");
//使用代理对象构建消息
buildSckillCourse(cno,uno);
//最后需要确认这个消息已经被消费
stringRedisTemplate.opsForStream().acknowledge(
streamKey,
"g1",
record.getId()
);
}catch (Exception e){
log.debug("获取消息消费异常..",e);
//如果出现异常,需要处理pedding-list中还未被确认的消息
handlerPeddinglist(streamKey);
}
}
}
private void buildSckillCourse(String cno, String uno) {
String lock=LOCK_PREFIX+uno;
synchronized (lock){
//这里还是需要代理对象来调用这个方法,防止事务失效
proxy.helpBuild(cno, uno);
}
}
/*
事务代码
*/
@Transactional
public void helpBuild(String cno, String uno) {
//扣减可选人数,构建课程信息表
//1.检查是否有重复选课
//1.1查询当前用户是否已经选了这门课
SelectParmas selectParmas = new SelectParmas();
selectParmas.setCno(Integer.valueOf(cno));
selectParmas.setUno(uno);
int isSelect = studentCourseMapper.findByUnoAndCno(selectParmas);
if (isSelect >0){
return;
}
//2. 扣减课程表可选人数
//2.1 乐观锁扣减可选数
int cnt = courseMapper.subSelects(Integer.valueOf(cno));
if (cnt<1){
//说明扣减失败,直接返回
return;
}
//3. 插入选课信息表(传入用户id和课程的id)
studentCourseMapper.insert(selectParmas);
return;
}
处理pedding-list:
private void handlerPeddinglist(String streamKey) {
while (true){
//死循环 一直执行消息队列里面的任务
try {
//读取未被确认的消息
List<MapRecord<String, Object, Object>> recordList = stringRedisTemplate.opsForStream()
.read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(streamKey, ReadOffset.from("0"))
);
if (recordList==null||recordList.isEmpty()){
//没有获取到消息,直接退出循环
break;
}
MapRecord<String, Object, Object> record = recordList.get(0);
//获取消息,构建选课信息表
Map<Object, Object> mp = record.getValue();
String cno = (String) mp.get("cno");
String uno = (String) mp.get("uno");
buildSckillCourse(cno,uno);
//最后需要确认这个消息已经被消费
stringRedisTemplate.opsForStream().acknowledge(
streamKey,
"g1",
record.getId()
);
}catch (Exception e){
log.debug("pedding-list消费异常..",e);
try {
Thread.sleep(200);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
测试
继续用jmeter做压测,最终结果:
平均值225ms,比起之前的3200ms,性能提高了十几倍。
查看redis:
没有出现问题。再来看看mysql。
依旧没有问题。
再来用张三发起攻击试试,看看有没有一人多抢问题
可以看到,张三也被制裁了。
使用分布式锁
针对使用synchornized可能出现的问题,在这里使用分布式锁解决一下。
我们可以自己设计一个简单的分布式锁来解决我们的需求,它需要具备以下特点:
-
可以解决分布式环境下synchornized的失效问题
-
性能需要高效
-
具备高可用的特性,不能出现安全问题
以上三个特点,很显然可以利用redis来做到。
设计思路如下:
-
利用redis的setnx命令来作为获取锁的标识。
-
当用户释放锁时,移除掉这个key就行。
-
用expire存活时间来兜底。
-
锁的键-值 这两个数值的设计
-
锁的键最好配合业务名称+业务id来设计,所以它不能是一成不变的。这里我们需要锁的是用户id,所以需要加上用户id,减小锁的粒度。这里我使用业务名+用户名来作为锁的key
-
锁的值需要具备唯一性,而不能随便设计。因为我们需要在释放锁的过程中与锁的值做一次确认,防止前一个线程误删当前线程的锁的现象发生。 这里我使用一段UUID的随机字符串+当前线程id来作为锁的value ,因为线程id是会自增的,所以可以确保唯一性。
-
最后,由于锁的释放是两步过程,还是有可能出现并发问题。
-
查询锁的value
-
删除锁
-
-
假如遇到JVM的FULL GC发生阻塞,导致expire时间到自动删除,而GC完成之后,再一次执行删除锁的动作,那么还是会出现误删锁的现象。
-
所以需要把两步过程变成原子性的——LUA脚本解决。
-
锁接口:
public interface ILock {
boolean tryLock(long seconds);
void unlock();
}
锁实现:
/*
简单版分布式锁
*/
public class SimpleRedisLock implements ILock{
private String name;
private StringRedisTemplate stringRedisTemplate;
public final static String KEY_PREFIX="lock:";
public final static String LOCK_PREFIX= UUID.randomUUID().toString(true);
private final static DefaultRedisScript LUA_SCRIPT;
static {
UNLOCK_SCRIPT=new DefaultRedisScript();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(long seconds) {
//1.获取线程标识
String value = LOCK_PREFIX+Thread.currentThread().getId();
//2.setnx
Boolean isok = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, value, seconds, TimeUnit.SECONDS);
return isok==null?false:isok;
}
@Override
public void unlock() {
//使用lua脚本完成
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name), //设置KEY
LOCK_PREFIX+Thread.currentThread().getId() //设置VALUE
);
}
}
LUA脚本
-- 这里的 KEYS[1] 就是锁的key,就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
修改代码
事实上需要修改的地方只有一个
前后对比:
private void buildSckillCourse(String cno, String uno) {
String lock=LOCK_PREFIX+uno;
synchronized (lock){
//这里还是需要代理对象来调用这个方法,防止事务失效
proxy.helpBuild(cno, uno);
}
}
private void buildSckillCourse(String cno, String uno) {
//传进去业务名,课程id,用户id
SimpleRedisLock lock=new SimpleRedisLock("course:"+cno+":"+uno,stringRedisTemplate);
try {
boolean isLock = lock.tryLock(10);
if (isLock){
//获取锁成功才能构建订单,不需要重试
proxy.helpBuild(cno, uno);
}
} finally {
lock.unlock();
}
}
最后再做高并发测试和张三测试,没有出现问题。
事实上,我的这个简单分布式锁还存在一些缺陷:
-
不支持锁重入
-
没有重试机制
-
用来兜底的存活时间事实上是需要做更新的,因为还有可能出现业务阻塞导致超时自动删除锁的可能。
这些缺陷,都在redisson分布式锁得到了解决。可以使用redisson来替代自己编写的锁。redisson这里不作过多展开。
总结
-
高并发的业务确实非常复杂,因为在高并发的情况下,需要考虑的问题太多。其中主要包含
-
数据一致性问题
-
系统的性能
-
系统的可靠性
-
-
一旦加入了中间件,就需要考虑数据的一致性问题,所以原本简简单单的一段业务代码就被扩展得很庞大。
-
优化思路可以总结如下:
-
将查询数据库的动作换成查询redis,减少查询数据库的次数。
-
利用消息队列,改同步为异步,尽快地返回结果。
-
-
存在的不足
-
redis毕竟不是专业做消息队列的中间件,可以考虑改成RabbitMQ,RocketMQ,Kafka等消息队列。
-
为了系统的高可用,最好还是做redis集群,这里我仅仅使用了一个redis,存在redis宕机的风险。
-
本人是java菜鸟,如果还存在不足,欢迎各位指教
-