在高并发场景下,超卖和一人一单是两个典型的并发问题。为了解决这两个问题,我们可以使用乐观锁(CAS)和悲观锁,这两者分别有不同的实现方式和适用场景。下面我们详细介绍如何通过 乐观锁(CAS) 和 悲观锁 来解决这两个问题。
假设我们有一张库存表
seckill_voucher
,其中包含字段:
voucher_id
: 优惠券IDstock
: 库存数量CREATE TABLE seckill_voucher ( voucher_id BIGINT PRIMARY KEY, stock INT, );
这里我们使用自定义的全局唯一ID生成器来创建缓存中的key:
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
@Component
public class RedisIdWorker {
// 开始时间戳
private static final long BEGIN_TIMESTAMP = 1735689600L; // 2025.01.01.00.00.00
// 序列号的位数
private static final long COUNT_BITS = 32;
private final StringRedisTemplate stringRedisTemplate;
public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
// 全局唯一ID生成器:(long)符号位+时间戳+序列号
public long nextId(String keyPrefix) {
// 1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC); // 获取当前秒数
long timestamp = nowSecond - BEGIN_TIMESTAMP;
// 2.生成序列号
// 2.1 获取当前日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
// 2.2 自增长
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 3.拼接并返回
return timestamp << COUNT_BITS | count;
}
// public static void main(String[] args) {
// LocalDateTime localDateTime = LocalDateTime.of(2025, 1, 1, 0, 0, 0);
// long second = localDateTime.toEpochSecond(ZoneOffset.UTC);
// System.out.println(second); // 1735689600
// }
}
我们正常的一个用户秒杀代码操作如下:
查询优惠卷信息 -> 判断秒杀是否开始 -> 判断秒杀是否结束 -> 判断秒杀卷是否充足 -> 扣减库存 -> 创建订单
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
return createVoucherOrder(voucherId);
}
private Result createVoucherOrder(Long voucherId) {
//5,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update();
if (!success) {
//扣减库存
return Result.fail("库存不足!");
}
//6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 6.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 6.2.用户id
Long userInId = UserHolder.getUser().getId();
voucherOrder.setUserId(userInId);
// 6.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
return Result.ok(orderId);
}
}
但是对于上面代码存在很大的问题:
如上图,假设商品剩余数量为1,那么对于多个线程同时对仅剩一件进行竞争,会导致线程安全问题,这个时候我们就会使用锁来解决,而锁主要分为两大类:悲观锁与乐观锁。
那么接下来我们使用乐观锁解决上述问题:
在大多数情况下,数据基本不会发生冲突,因此在更新操作前不加锁,而是在提交时验证数据是否有冲突。如果数据被其他线程更新修改则操作失败,调用方可以选择重试或返回错误。乐观锁的常见实现方式是使用 CAS(Compare-And-Swap)。
乐观锁的关键在于判断之前查询得到的数据是否有被修改过。
高并发支持:CAS操作不需要加锁,因此适合高并发场景,减少了锁竞争,提高了性能。
无死锁:与悲观锁相比,CAS不会发生死锁问题,因为它不需要锁住资源。
通过上面对于问题以及CAS解决方案的分析介绍,我们在扣减库存之前,需要对stock库存量进行判断,是否为查询到的值,如果为查询到的值,那么就能证明在此期间未曾被修改过。
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
return createVoucherOrder(voucherId);
}
private Result createVoucherOrder(Long voucherId) {
// 一人一单
Long userId = UserHolder.getUser().getId();
// 查询订单
int count = query().eq("user_id",userId).eq("voucher_id", voucherId).count();
// 判断是否存在
if(count > 0){
// 用户已经购买过
return Result.fail("用户已经购买过");
}
//5,扣减库存
// boolean success = seckillVoucherService.update()
// .setSql("stock= stock -1")
// .eq("voucher_id", voucherId).update();
// 乐观锁(操作前先判断是否有更新操作) -> CAS方法(局限:多个线程只能卖出一次)
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1") // set stock = stock - 1
.eq("voucher_id", voucherId).eq("stock",voucher.getStock()) // where id = ? and stock = ?
.update();
if (!success) {
//扣减库存
return Result.fail("库存不足!");
}
//6.创建订单
// ...
}
}
但是注意,虽然我们对线程安全问题进行了处理,但是这样处理后的结果是对数据库产生的压力增大,并且假设有一百个线程同时对库存量为100的商品进行操作,那么大概率只能有一个线程能够成功(因为同一时间只能有一个被修改),这样操作一波后库存量变为99,成功率极低,并且对于业务处理很不友好,那么这个时候我们就需要将数据库操作语句更改为stock > 0,就可以解决上述问题,只对于商品的临界数进行限制就可以很好的处理该问题。
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
return createVoucherOrder(voucherId);
}
private Result createVoucherOrder(Long voucherId) {
// 一人一单
Long userId = UserHolder.getUser().getId();
// 查询订单
int count = query().eq("user_id",userId).eq("voucher_id", voucherId).count();
// 判断是否存在
if(count > 0){
// 用户已经购买过
return Result.fail("用户已经购买过");
}
//5,扣减库存
// 乐观锁(操作前先判断是否有更新操作) -> CAS方法(局限:多个线程只能卖出一次)
// boolean success = seckillVoucherService.update()
// .setSql("stock= stock -1") // set stock = stock - 1
// .eq("voucher_id", voucherId).eq("stock",voucher.getStock()) // where id = ? and stock = ?
// .update();
// CAS改进:将库存判断改成stock > 0以此来提高性能
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1") // set stock = stock - 1
.eq("voucher_id", voucherId).eq("stock",0) // where id = ? and stock > 0
.update();
if (!success) {
//扣减库存
return Result.fail("库存不足!");
}
//6.创建订单
// ...
}
}
通过上面方法我们成功解决了超卖场景的高并发导致的线程安全问题,那么接下来我们就要解决一人一单的问题了。
对于一人一单问题要求一个用户只能购买一次某个优惠券。在上述代码中,我们可以通过查询数据库中是否已经有用户购买的记录来避免一个用户重复购买:
查询优惠卷信息 -> 判断秒杀是否开始 -> 判断秒杀是否结束 -> 判断秒杀卷是否充足 -> 判断是否满足一人一单 -> 扣减库存 -> 创建订单
如果同一用户已经存在购买记录,则不能再次购买,避免了一人多单问题,并且这个逻辑也与 乐观锁(CAS) 配合使用,确保用户只能成功创建一次订单。
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
// ...
return createVoucherOrder(voucherId);
}
}
private Result createVoucherOrder(Long voucherId) {
// 一人一单处理代码
Long userId = UserHolder.getUser().getId();
// 查询订单
int count = query().eq("user_id",userId).eq("voucher_id", voucherId).count();
// 判断是否存在
if(count > 0){
// 用户已经购买过
return Result.fail("用户已经购买过");
}
//5,扣减库存
// CAS改进:将库存判断改成stock > 0以此来提高性能
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1") // set stock = stock - 1
.eq("voucher_id", voucherId).eq("stock",0) // where id = ? and stock > 0
.update();
if (!success) {
//扣减库存
return Result.fail("库存不足!");
}
//6.创建订单
// ...
}
}
那么同样的道理,在高并发的场景下,假设有100个用户都没有购买过,那么 if 语句就不会拦截,虽然通过 eq("stock", 0)
确保只有当库存大于0时才会成功扣减库存,但是在高并发情况下,仍然可能会发生以下几种情况:
-
重复购买:多个用户几乎同时请求购买同一张优惠券,虽然你检查了库存并执行了扣减操作,但在高并发时,库存检查和扣减操作之间的时间差可能非常小。如果两个请求几乎同时到达,可能会先检查库存是否大于0然后再进行扣减,而两者在检查时库存都大于0,导致两次扣减操作都成功,从而导致超卖。
-
一人多单:虽然你通过查询订单判断用户是否已经购买过,但在高并发的情况下,如果两个请求几乎同时发起,在检查订单时两个请求都没有找到订单(因为数据库操作存在延迟),导致两者都认为用户没有购买过,从而都创建了订单。
这些步骤虽然在逻辑上是连贯的,但它们并不是原子性的,尤其是在高并发时。即使有乐观锁来处理库存问题,但对于 一人一单 的逻辑,仍然可能存在数据库查询和插入的竞争条件。两个请求同时检测到用户没有购买过,分别尝试创建订单,这时即便库存只有一张,还是可能出现两个订单。
那么这个时候我们就需要加悲观锁来将下面步骤锁上,以此来同时处理:
synchronized { 判断是否满足一人一单 -> 扣减库存 -> 创建订单 }
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
// ...
return createVoucherOrder(voucherId);
}
@Transactional
public synchronized Result createVoucherOrder(Long voucherId) {
// 一人一单
Long userId = UserHolder.getUser().getId();
// 查询订单
int count = query().eq("user_id",userId).eq("voucher_id", voucherId).count();
// 判断是否存在
if(count > 0){
// 用户已经购买过
return Result.fail("用户已经购买过");
}
//5,扣减库存
// CAS改进:将库存判断改成stock > 0以此来提高性能
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1") // set stock = stock - 1
.eq("voucher_id", voucherId).eq("stock",0) // where id = ? and stock > 0
.update();
if (!success) {
//扣减库存
return Result.fail("库存不足!");
}
//6.创建订单
// ...
}
}
这样操作后,每个用户的订单创建请求都会被顺序处理,在同一时刻只能有一个线程可执行该方法,其他线程会被阻塞。但这个方案的缺点是锁的粒度过大,可能会导致性能瓶颈,特别是在高并发的情况下。
这种情况我们可以通过 synchronized (userId.toString().intern())
锁定每个用户 ID。这个锁的粒度是针对单个用户的。当多个线程尝试处理同一用户的订单时,它们会被串行化处理,其他用户的操作不会受到影响。如果多个线程操作不同的用户,那么它们的操作仍然可以并发进行。因为我们只需要限制对于一个用户的多个线程不能同时进行扣减库存以及创建订单操作就可以了。
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()){
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy(); // 获取代理对象 (需要引入aspectjweaver依赖并在实现类加入@EnableAspectJAutoProxy(exposeProxy = true))
return proxy.createVoucherOrder(voucherId);
}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
// 一人一单
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()){
// 查询订单
int count = query().eq("user_id",userId).eq("voucher_id", voucherId).count();
// 判断是否存在
if(count > 0){
// 用户已经购买过
return Result.fail("用户已经购买过");
}
//5,扣减库存
// boolean success = seckillVoucherService.update()
// .setSql("stock= stock -1")
// .eq("voucher_id", voucherId).update();
// 乐观锁(操作前先判断是否有更新操作) -> CAS方法(局限:多个线程只能卖出一次)
// boolean success = seckillVoucherService.update()
// .setSql("stock= stock -1") // set stock = stock - 1
// .eq("voucher_id", voucherId).eq("stock",voucher.getStock()) // where id = ? and stock = ?
// .update();
// CAS改进:将库存判断改成stock > 0以此来提高性能
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1") // set stock = stock - 1
.eq("voucher_id", voucherId).eq("stock",0) // where id = ? and stock > 0
.update();
if (!success) {
//扣减库存
return Result.fail("库存不足!");
}
//6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 6.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 6.2.用户id
Long userInId = UserHolder.getUser().getId();
voucherOrder.setUserId(userInId);
// 6.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
return Result.ok(orderId);
}
}
}
两种加锁位置的不同,为什么加在方法内部较好?
① 对于事务方面,两者都使用了
@Transactional
注解来确保事务的一致性和原子性。事务的本质是保证操作的 一致性,即在整个操作过程中,如果出现异常可以回滚。因此,@Transactional
确保了 订单创建、库存扣减等操作的事务性,即使使用了锁,也不会改变操作的原子性。然而,方法级锁(第二种方式)可能会引发 长时间的锁竞争,特别是在库存扣减、订单查询等操作较多时,可能会导致 事务执行时间过长,这会影响性能。② 对于细粒度控制,在第二种方式中,锁的粒度更细,锁定的范围仅限于特定的用户。不同用户的请求可以并发处理,提高了系统的吞吐量。而在第一种方式中,整个方法都被锁住了,所有请求都必须排队等待,影响了系统的并发能力。
锁定用户ID(第二种方式)则避免了不同用户之间的锁竞争,只会针对同一个用户的请求加锁。
为什么要这么写userId.toString().intern()
① userId.toString()
会将Long
类型的userId
转换为字符串。这样做是因为在 Java 中,synchronized
关键字要求锁住的对象必须是一个 对象(Object),而userId
是Long
类型的原始数据类型(primitive),不能直接用于synchronized
的锁定。所以我们需要先将它转换为String
类型。另外一个原因就是因为对于每一个请求接收到的线程内的user都是全新的user对象,是变化的,而值是固定的,所以我们给他转换为String类型,但是toString()方法的底层最后是返回 new 一个String对象,而这样肯定也不可以作为锁的条件,所以我们要再字符串池内拿到锁的实例,因为实例是唯一的,就需要调用下面的方法intern()
。String lock1 = userId.toString(); // 新创建一个字符串对象 String lock2 = userId.toString(); // 再次创建一个新的字符串对象
② intern()
方法的作用是返回该字符串在 JVM 字符串池中的唯一引用。JVM 会为所有常量字符串(如"abc"
)以及通过intern()
方法处理过的字符串,保证它们在内存中只有一份唯一的实例。当我们调用intern()
时,JVM 会检查字符串池中是否已经存在这个字符串。如果存在,就返回这个池中的实例;如果不存在,则将这个字符串加入池中。String lock1 = userId.toString().intern(); // 锁定唯一的字符串对象 String lock2 = userId.toString().intern(); // 锁定同一个字符串对象
③ 在这里,
userId.toString().intern()
的目的就是利用 字符串池 来确保针对相同userId
的锁是 唯一的。通过这种方式,不同线程针对同一个userId
发起请求时,它们会共享同一个锁对象。并且通过使用字符串池,可以减少内存占用,同时避免每次都为每个userId
创建新的锁对象,这样在并发的情况下,加锁和释放锁的效率会更高。
但是这样之后还有一个问题,我们在处理完这三部操作后,锁就会打开,这个时候其他的线程就可以进来,但是因为事务是整个方法,此时一但事务没有提交结束,其他线程直接进入锁,就会造成线程安全问题,所以我们需要将这个锁的范围扩大:
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
// ...
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()){
return this.createVoucherOrder(voucherId);
}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
// ...
}
}
但是我们只是对 createVoucher 函数加入事务处理而并没有给 seckillVoucher 函数加入,所以 seckillVoucher 函数调用 createVoucher 函数是使用 this 进行调用,而这个 this 仅是当前类对象,而不是该类的代理对象,因为事务如果想要生效,Spring 是对当前类做一个动态代理进而拿到其代理对象,随后使用其代理对象来进行事务处理,所以这个时候就需要我们拿到事务的代理对象:
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
// ...
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()){
// 获取当前类的代理对象
// 需要引入aspectjweaver依赖,并且在实现类加入@EnableAspectJAutoProxy(exposeProxy = true)以此来暴露代理对象
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
// ...
}
}
通过上面所讲述使用悲观锁与乐观锁可以很好的解决单机情况下的一人一单问题,但是在集群模式就不行了,这是为什么呢?
首先我们可以模拟一下集群模式,打开idea,按住ALT+8打开服务框,点击+号,选中Springboot添加就能够出来端口:
之后点击该项目按住Ctrl+D:
因为我的是2024年的idea,需要点击修改选项,然后点击覆盖配置属性:
那么这样就可以出现集群了:
随后运行我们的代码可以发现,当我们同时访问的一个nginx代理的8080端口后,我们可以发现下面的锁是产生了问题:
synchronized (userId.toString().intern()){
// 获取当前类的代理对象
// 需要引入aspectjweaver依赖,并且在实现类加入@EnableAspectJAutoProxy(exposeProxy = true)以此来暴露代理对象
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
在单体项目的情况,我们只是对一个JVM虚拟机内的锁监视器进行操作,而在多集群模式下就会有多个所监视器,这样的锁就无法正确的使用了,如下图:
那么我们如何让锁在两个甚至多个集群下进行使用来达到我们能够锁住呢?
这个时候我们就需要使用一个能够跨JVM也就是跨进程的锁 --- redis分布式锁。
有关redis分布式锁的内容请看下篇博客: