文章目录
- 高并发场景秒杀抢购超卖Bug
- 高并发场景秒杀抢购Demo
- 测试结果
- JVM级别锁
- 使用nginx对本地服务进行负载均衡
- Redis实现分布式锁
- Redis分布式锁实现Demo
- Redis分布式锁有关问题
- 分布式锁性能的提升
- 减少锁的粒度
- 使用异步处理
高并发场景秒杀抢购超卖Bug
在今天的数字化世界中,高并发已经成为了许多在线应用的常态,尤其是在电商平台的秒杀活动、票务系统的抢票环节,或者任何需要处理大量用户请求的场景中。然而,高并发也带来了一系列的挑战,其中最常见的就是超卖问题。
想象一下,你正在举办一个大型的秒杀活动,数万名用户在同一时间抢购同一款限量的商品。在理想的情况下,系统应该能够正确地处理所有的请求,确保商品的库存数量不会被超额扣减。然而,现实情况往往并非如此。如果没有有效的并发控制机制,你的系统可能会在短时间内接收到大量的购买请求,导致商品的库存数量被超额扣减,即出现超卖现象。这不仅会影响到你的业务运营,也会对用户体验产生负面影响。
高并发场景秒杀抢购Demo
以下是一个高并发场景场景秒杀抢购的一个Demo:
这里我用了一个子项目继承了父项目
父pom.xml
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
子pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
下面就是代码了:
StockController.class
@RestController
@RequestMapping("/stock")
public class StockController {
@Autowired
StockService stockService;
@RequestMapping("/deduct")
public String deductStock(){
return stockService.deductStock();
}
}
StockService.class
@Service
public class StockService {
@Autowired
StringRedisTemplate stringRedisTemplate;
public String deductStock(){
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if(stock>0){
int realStock = stock -1;
stringRedisTemplate.opsForValue().set("stock",realStock+"");
System.out.println("扣减成功,剩余库存:"+realStock);
}else {
System.out.println("扣减失败,库存不足");
}
return "end";
}
}
这里使用了redis,给redis中stock的值是200
然后用Apache JMeter压测工具进行压测,我这里是让500个线程在1s之内启动然后去争抢200个库存
为了更直观的看出到底售卖出了多少商品,我在redis中存放了一个 key
为 ok
,value
为 0
的值,然后修改StockService,每次售卖出一件商品的时候给redis
中的ok
+1。
测试结果
从控制台的输出打印可以看出出现了大量的超卖问题,同一个商品被卖出了多次,200个商品卖完,竟然卖出了313件。
那么,如何解决这个问题呢?
JVM级别锁
学过并发,会想到使用synchronized
或者ReentrantLock
,这样的JVM级别的锁来解决这个问题。
进行压测会发现问题解决了,200件商品的确是卖出去了200次,并没有出现超卖问题。但是如果在高并发场景下,使用了分布式架构。有多个tomcat
,JVM级别的锁还有用吗。
使用nginx对本地服务进行负载均衡
这里我启动了两个服务器,使用了不同端口,一个8080,一个8090
然后本地启动了一台虚拟机,这台虚拟机用来做本地的负载均衡使用
虚拟机已经安装好了nginx,首先ipconfig
找到本机的局域网ip地址,在nginx.conf 配置上对该ip地址的8080端口和8090端口进行负载均衡。
最后访问虚拟机的局域网地址和nginx.conf 配置的端口号即可。虚拟机的局域网地址可以通过ifconfig
命令查询
不停的访问该地址可以看到两个服务器上都有打印输出,现在对该接口进行压测发现200个商品被卖出去了245次。
JVM级别的锁只能管理本tomcat的线程,其他服务器的线程是没有办法管理的。那应该使用什么呢?
Redis实现分布式锁
Redis提供了一种名为SETNX的命令,可以用来实现分布式锁。
Redis分布式锁实现Demo
@Service
public class StockService {
@Autowired
StringRedisTemplate stringRedisTemplate;
public String deductStock() {
String lockKey = "lock:product_1";
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "root");
if (!result) {
return "error_code";
}
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
stringRedisTemplate.opsForValue().increment("ok");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
stringRedisTemplate.delete(lockKey);
return "end";
}
}
Redis分布式锁有关问题
当然这里还有很多问题
问题1:通过SETNX
上锁以后,中间代码如果出现异常,导致后面的删除锁代码无法执行,导致死锁。
解决方法:添加try-catch,并将删除锁代码放在finally
中,这样无论有没有抛异常都会释放锁。
问题2:通过SETNX
上锁以后,运行过程出现 宕机 ,系统被 重启 ,同样将导致 死锁。
解决方法:给锁 设置超时时间,过了一段时间以后,锁将自动释放。
问题3:通过SETNX上锁以后 出现异常 ,导致无法去expire设置锁的超时时间,也没有办法去手动释放锁,导致 死锁。
解决方法:保证上锁和设置超时时间的 原子性,使用Boolean setIfAbsent(K key, V value, long timeout, TimeUnit unit);
方法。
问题4:线程A中 上锁设置过期时间 完成以后,系统出现阻塞,导致锁已经到了过期时间并自动删除了,这时候还没执行释放锁的操作,这时候线程B上锁成功,并执行任务,结果线程A反应过来了,继续执行释放锁的操作,把线程B上的锁给释放了,后面的线程上的锁都被前面的线程释放。
解决方法:这个问题的根本点在于,线程可以释放其他线程所加的锁,可以给锁添加uuid,让线程只能释放自己加的锁。
问题5:线程A 上锁设置过期时间并执行任务后,希望判断比较锁的id之后去释放锁,判断通过以后系统出现阻塞,阻塞到 锁已经过期了,但是此时并未执行释放锁的操作,此时线程B上锁成功,并去执行任务。线程A反应了过来,然后将线程B上的锁给释放了,这时候又将出现上面的问题。
解决方法:这个问题的根本原因在于 最后判断锁id的时候和释放锁的操作没有保证 原子性 。使用redis执行LUA脚本,保证 能同时执行判断锁和释放锁。这个redis并没有提供方法。
问题6:锁续命,线程A任务还没执行完,锁已经过期了,此时其他线程也会执行任务,就会出现并发安全问题。
解决方法:可以使用WatchDog,也就是给任务执行线程添加守护线程,守护线程负责对锁的expire时间进行监控,每当到过期前一秒就对过期进行判断,如果任务还在进行且锁马上过期,就对过期时间进行设置。
上面的问题5和问题6都可以通过redis组件,redisson
解决
分布式锁性能的提升
减少锁的粒度
上述场景可以使用分段锁。基于加锁的分段机制,可以给分布式锁的性能提升几十倍。将一个商品分成好几个key。比如一个商品1000个库存,拆成10个key,每个key放100个库存。
实际实现还是有许多细节,比如对key的轮询选择,如果一个key库存为0了,就不应该再选择这个库存了。如果每个key库存都只有1,要减少5个库存的解决。实现可以参考ConcurrentHashMap
1.7的源码。
使用异步处理
你可以使用消息队列等技术,将请求的处理过程异步化。当一个请求到达时,你只需要将它放入消息队列,然后立即返回。这样,你的系统就可以快速地处理大量的并发请求。然后,你可以在后台有一个或多个工作线程,从消息队列中取出请求并处理。