高并发秒杀使用RabbitMQ的优化思路
- 一、判断是否重复抢购(防止一人多次秒杀)的逻辑
- 1. 整体逻辑代码
- 2. 原始判断重复抢购的方式:
- 3. 后来优化为什么用 Redis 判断?
- 二、高并发下优化过的秒杀逻辑
- 1.秒杀核心逻辑(请求入口)
- 2.系统初始化逻辑(项目启动时调用)
- 3. 整体流程
- 三、 RabbitMQ 秒杀消息的发送与消费逻辑
- 1. `RabbitMQConfig.java`:配置 RabbitMQ 消息队列
- 2. `MQSender.java`:发送秒杀消息
- 3. `MQReceiver.java`:接收秒杀消息并处理
- 4. 秒杀消息处理完整流程总结
- 5. 重复判断问题
- 四、 秒杀系统异步下单+轮询查询结果
- 1.整体流程图
- 2. `/result` 轮询接口逻辑详解
- 3. 对应的 `getResult()` 方法逻辑:
- 4. 为什么要这样设计?
- 五、Redis分布式锁的原子性操作
- 缘起:
- 1. 问题场景:高并发秒杀下的“扣库存”
- 2. 核心问题:这不是一个**原子操作**
- 多线程并发问题示意:
- 结果:
- 3. 根本原因:Redis 这些操作不是“原子性的”
- 4. 解决思路:加锁(分布式锁)
- 效果:
- 5. 再进一步:为什么普通锁也不够,还要加唯一值 + Lua 脚本?
- 第一步:最基础的锁实现
- 目的:
- 存在的问题:
- 第二步:加上过期时间(自动释放锁)
- 改进点:
- 新的问题:
- 第三步:引入唯一标识防止误删(UUID)
- 改进点:
- 新的问题:
- 第四步:用 Lua 脚本原子释放锁
- 改进点:
- 总结
一、判断是否重复抢购(防止一人多次秒杀)的逻辑
1. 整体逻辑代码
// 查询指定商品的详细信息,包括秒杀价格、库存等
GoodsVo goods = goodsService.findGoodsVoByGoodsId(goodsId);
// 判断商品库存是否充足(即是否还有剩余可秒杀的数量)
if (goods.getStockCount() < 1) {
// 库存不足,返回秒杀失败,提示库存为空
return RespBean.error(RespBeanEnum.EMPTY_STOCK);
}
// 判断当前用户是否已经秒杀过该商品(防止重复抢购)
// 注释掉的是原来的数据库方式判断:
// SeckillOrder seckillOrder = seckillOrderService.getOne(new
// QueryWrapper<SeckillOrder>().eq("user_id", user.getId()).eq("goods_id", goodsId));
// 使用 Redis 判断是否已经下过秒杀订单
// 拼接 Redis key:order:用户id:商品id
String seckillOrderJson = (String) redisTemplate.opsForValue().get("order:" + user.getId() + ":" + goodsId);
// 判断 Redis 中是否存在该 key 的值,说明该用户已经抢购过
if (!StringUtils.isEmpty(seckillOrderJson)) {
// 存在记录,说明重复抢购,返回错误提示
return RespBean.error(RespBeanEnum.REPEATE_ERROR);
}
// 正常进入下单流程,调用秒杀下单服务
Order order = orderService.seckill(user, goods);
// 如果下单成功,返回成功响应以及订单对象
if (null != order) {
return RespBean.success(order);
}
这段代码整体逻辑顺序如下:
- 获取商品详情;
- 判断库存;
- 判断是否重复下单(现在是用 Redis);
- 调用下单逻辑;
- 返回结果。
2. 原始判断重复抢购的方式:
// 从数据库中查找是否已经存在该用户对该商品的秒杀订单
SeckillOrder seckillOrder = seckillOrderService.getOne(
new QueryWrapper<SeckillOrder>()
.eq("user_id", user.getId())
.eq("goods_id", goodsId)
);
new QueryWrapper<SeckillOrder>() // 创建一个查询构造器,用于构造 SeckillOrder 表的查询条件
.eq("user_id", user.getId()) // 添加查询条件:字段 user_id 等于当前用户的 ID(即查询该用户的记录)
.eq("goods_id", goodsId) // 添加查询条件:字段 goods_id 等于当前商品的 ID(即查询该商品的记录)
这段代码等价于 SQL 中的:
SELECT * FROM seckill_order
WHERE user_id = 当前用户ID AND goods_id = 当前商品ID;
从秒杀订单表中查询 user_id 等于当前用户,且 goods_id 等于当前商品 的记录。
-
逻辑解释:
SeckillOrder
表是秒杀订单表,设置了唯一索引,同一个用户,对同一件商品,只能有一条秒杀订单记录。
CREATE TABLE seckill_order (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL,
goods_id BIGINT NOT NULL,
order_id BIGINT NOT NULL,
-- 其他字段 ...
UNIQUE KEY uniq_user_goods (user_id, goods_id)
);
- 如果
seckillOrder != null
,就说明用户已经抢购过:
if (seckillOrder != null) {
return RespBean.error(RespBeanEnum.REPEATE_ERROR);
}
3. 后来优化为什么用 Redis 判断?
String seckillOrderJson = (String) redisTemplate.opsForValue()
.get("order:" + user.getId() + ":" + goodsId);
if (!StringUtils.isEmpty(seckillOrderJson)) {
return RespBean.error(RespBeanEnum.REPEATE_ERROR);
}
-
优点:
- 性能更高:不查数据库,改查 Redis,速度更快,减轻数据库压力。
- 适合高并发场景:秒杀场景下请求量大,Redis 更适合高并发判断。
二、高并发下优化过的秒杀逻辑
1.秒杀核心逻辑(请求入口)
// 获取 Redis 中的操作对象,用于字符串类型操作
ValueOperations valueOperations = redisTemplate.opsForValue();
// -------- 判断是否重复抢购 --------
// 从 Redis 中获取该用户是否已经抢购过该商品
String seckillOrderJson = (String) valueOperations.get("order:" + user.getId() + ":" + goodsId);
// 如果已经存在该用户对该商品的订单,说明是重复抢购
if (!StringUtils.isEmpty(seckillOrderJson)) {
return RespBean.error(RespBeanEnum.REPEATE_ERROR); // 返回“重复秒杀”错误
}
// -------- 内存标记减少 Redis 访问 --------
// 如果内存中的标记已经说明该商品没有库存了,直接返回,减少对 Redis 的访问
if (EmptyStockMap.get(goodsId)) {
return RespBean.error(RespBeanEnum.EMPTY_STOCK); // 返回“库存为空”错误
}
// -------- 预减库存(Redis 预扣减) --------
// 对 Redis 中的商品库存执行递减操作
Long stock = valueOperations.decrement("seckillGoods:" + goodsId);
// 如果库存扣减后小于 0,说明库存已被抢光
if (stock < 0) {
// 设置内存标记,后续请求就不再访问 Redis 了
EmptyStockMap.put(goodsId, true);
// 回滚 Redis 中的库存(因为刚才减了一次)
valueOperations.increment("seckillGoods:" + goodsId);
return RespBean.error(RespBeanEnum.EMPTY_STOCK); // 返回“库存为空”错误
}
// -------- 请求入队(异步下单) --------
// 创建秒杀消息对象,封装用户和商品信息
SeckillMessage message = new SeckillMessage(user, goodsId);
// 发送消息到 RabbitMQ 队列,让后端异步去处理下单逻辑
mqSender.sendsecKillMessage(JsonUtil.object2JsonStr(message));
// 秒杀请求排队中,立即返回成功(前端可以轮询查询是否下单成功)
return RespBean.success(0);
2.系统初始化逻辑(项目启动时调用)
// 实现 InitializingBean 接口的 afterPropertiesSet 方法,在 Spring 初始化 Bean 后执行
@Override
public void afterPropertiesSet() throws Exception {
// 查询所有参与秒杀的商品列表
List<GoodsVo> list = goodsService.findGoodsVo();
// 如果商品列表为空,直接返回
if (CollectionUtils.isEmpty(list)) {
return;
}
// 遍历每个商品,将库存数量加载到 Redis,同时初始化内存标记为“有库存”
list.forEach(goodsVo -> {
// Redis 中设置商品库存,key 是 seckillGoods:商品ID,value 是库存数量
redisTemplate.opsForValue().set("seckillGoods:" + goodsVo.getId(),
goodsVo.getStockCount());
// 内存中标记该商品有库存(false 表示“未被标记为无库存”)
EmptyStockMap.put(goodsVo.getId(), false);
});
}
3. 整体流程
阶段 | 技术 | 作用 |
---|---|---|
重复抢购校验 | Redis + 用户ID-商品ID 键 | 高效判断是否已经秒杀过 |
库存控制 | Redis decrement | 避免并发超卖 |
内存标记 | EmptyStockMap | 避免频繁访问 Redis |
异步处理 | RabbitMQ + 秒杀消息对象 | 将核心下单操作交由后端异步处理,减轻主线程压力 |
初始化 | Redis 预加载 | 提前加载秒杀商品库存,提升响应速度 |
三、 RabbitMQ 秒杀消息的发送与消费逻辑
1. RabbitMQConfig.java
:配置 RabbitMQ 消息队列
package com.xxxxx.seckill.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ 配置类
* 配置队列、交换机和绑定关系
* 用于秒杀系统的消息异步处理
*/
@Configuration
public class RabbitMQConfig {
// 定义队列名称常量
private static final String QUEUE = "seckillQueue";
// 定义交换机名称常量
private static final String EXCHANGE = "seckillExchange";
/**
* 定义一个名为 seckillQueue 的队列
* @return 队列对象
*/
@Bean
public Queue queue(){
return new Queue(QUEUE);
}
/**
* 将队列与交换机进行绑定,并设置路由键为 seckill.#
* 意味着所有以 seckill. 开头的消息都会被路由到 seckillQueue 队列中
*/
@Bean
public Binding binding01(){
return BindingBuilder
.bind(queue()) // 绑定队列
.to(topicExchange()) // 指定交换机
.with("seckill.#"); // 路由键匹配规则:以 seckill. 开头的所有消息
}
}
2. MQSender.java
:发送秒杀消息
package com.xxxxx.seckill.rabbitmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 消息发送者(将秒杀请求异步发送到 RabbitMQ)
*/
@Service
@Slf4j
public class MQSender {
// 注入 RabbitTemplate,用于操作 RabbitMQ
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送秒杀消息
* @param message 消息体(通常是包含用户和商品ID的 JSON 字符串)
*/
public void sendsecKillMessage(String message) {
log.info("发送消息:" + message); // 打印日志,便于调试
// 发送消息到交换机 seckillExchange,使用 routingKey 为 seckill.msg
rabbitTemplate.convertAndSend("seckillExchange", "seckill.msg", message);
}
}
3. MQReceiver.java
:接收秒杀消息并处理
package com.xxxxx.seckill.rabbitmq;
import com.xxxxx.seckill.pojo.User;
import com.xxxxx.seckill.service.IGoodsService;
import com.xxxxx.seckill.service.IOrderService;
import com.xxxxx.seckill.util.JsonUtil;
import com.xxxxx.seckill.vo.GoodsVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
这些是基本的包导入,含业务服务类、工具类和 Redis 组件。
/**
* 消息接收者(从 RabbitMQ 获取秒杀请求并处理)
*/
@Service
@Slf4j
public class MQReceiver {
@Autowired
private IGoodsService goodsService;
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private IOrderService orderService;
/**
* 消费者监听 seckillQueue 队列
* 接收到消息后开始处理秒杀逻辑
*/
@RabbitListener(queues = "seckillQueue")
public void receive(String msg) {
log.info("QUEUE接受消息:" + msg); // 打印日志
// 将 JSON 字符串反序列化成 SeckillMessage 对象
SeckillMessage message = JsonUtil.jsonStr2Object(msg, SeckillMessage.class);
// 从消息中提取商品ID和用户信息
Long goodsId = message.getGoodsId();
User user = message.getUser();
// 查询商品详情(包括秒杀库存等)
GoodsVo goods = goodsService.findGoodsVoByGoodsId(goodsId);
// ------- 判断库存是否足够 -------
if (goods.getStockCount() < 1) {
return; // 库存不足,直接返回,不再继续处理
}
// ------- 判断是否重复秒杀 -------
// 使用 Redis 判断该用户是否已抢购该商品(Redis中有记录则表示已经下单)
String seckillOrderJson = (String)
redisTemplate.opsForValue().get("order:" + user.getId() + ":" + goodsId);
if (!StringUtils.isEmpty(seckillOrderJson)) {
return; // 已经秒杀过了,直接返回
}
// ------- 执行秒杀下单逻辑 -------
// 调用订单服务,完成订单生成、库存扣减等
orderService.seckill(user, goods);
}
}
4. 秒杀消息处理完整流程总结
步骤 | 说明 |
---|---|
1. 前端点击“秒杀”按钮 | 请求发送到后台秒杀接口 |
2. 后台进行校验 | 包括是否重复抢购、库存校验、内存标记 |
3. 校验通过后发送消息 | 使用 MQSender 发送消息到 RabbitMQ |
4. 消费端监听 seckillQueue | 使用 @RabbitListener 自动接收消息 |
5. 反序列化消息 | 转换为 SeckillMessage 对象 |
6. 查询商品信息 | 获取库存 |
7. 再次校验是否重复秒杀、库存是否足够 | |
8. 执行下单逻辑 | 调用 orderService.seckill() 进行下单入库、更新 Redis 等操作 |
5. 重复判断问题
已经在接口层(controller/service)对库存是否足够和是否重复秒杀做了一次判断,为什么在 MQReceiver.java 里还要再判断一遍呢?
答案可以用一句话总结:
因为消息队列是异步的,接口层的判断并不能保证最终数据一致性。真正的“抢购成功”必须由消息消费方进行最终确认。
- 第一次判断(接口层
秒杀接口
中):
// 1. 判断是否重复秒杀(Redis 中存在这个用户和商品的订单)
if (!StringUtils.isEmpty(seckillOrderJson)) {
return RespBean.error(RespBeanEnum.REPEATE_ERROR);
}
// 2. 判断库存是否为 0(Redis 预减库存)
Long stock = valueOperations.decrement("seckillGoods:" + goodsId);
if (stock < 0) {
EmptyStockMap.put(goodsId,true);
valueOperations.increment("seckillGoods:" + goodsId);
return RespBean.error(RespBeanEnum.EMPTY_STOCK);
}
// 3. 通过 RabbitMQ 异步下单
mqSender.sendsecKillMessage(JsonUtil.object2JsonStr(message));
这个阶段主要是为了快速响应用户请求、限流、预拦截非法操作。因为秒杀高并发,不能所有请求都进入数据库,先通过 Redis 做一轮筛选。
- 第二次判断(消费方
MQReceiver.java
中):
// 1. 获取商品库存信息(查数据库)
if (goods.getStockCount() < 1) {
return;
}
// 2. 再次判断是否已经秒杀(Redis 或数据库确认)
String seckillOrderJson = (String)
redisTemplate.opsForValue().get("order:" + user.getId() + ":" + goodsId);
if (!StringUtils.isEmpty(seckillOrderJson)) {
return;
}
-
为什么还要再次判断?
-
数据最终一致性的保障(兜底逻辑)
- 用户发起请求时可能网络延迟、并发穿透、Redis 短暂未同步等问题,导致多个请求都通过了前端判断。
- 如果消费方不再次判断,就可能出现超卖或重复下单!
-
防止 Redis 与数据库数据不一致
- Redis 是缓存,最终写入的订单和库存数据必须以数据库为准。
- Redis 中库存可能出现错误(例如手动清空 Redis 缓存后重启服务),但数据库是强一致的。
-
避免“多次秒杀”绕过逻辑
- 如果用户用不同终端 / IP 并发请求,有可能绕过前端检查,甚至模拟请求。
- 所以最后还是得由消费方从数据库或 Redis再次校验。
-
- 总结:两层判断是为了兼顾性能 + 数据安全
层级 | 处理位置 | 作用 | 缺点 | 优点 |
---|---|---|---|---|
第一次 | 接口层(Controller / Service) | 快速判断,提高性能,减轻 MQ 和数据库压力 | 数据不一定可靠 | 响应快,限流效果好 |
第二次 | MQReceiver 消费者端 | 最终判断是否成功抢购 | 响应慢(异步) | 保证数据一致性,防止超卖和重复秒杀 |
四、 秒杀系统异步下单+轮询查询结果
1.整体流程图
用户点击「秒杀」按钮
|
发起 /doSeckill 请求(通常是 POST)
|
秒杀服务判断幂等、库存、入队
|
秒杀消息被投递到 MQ(如RabbitMQ)
|
---异步处理开始---
|
MQReceiver 消费消息
|
判断库存是否充足、是否重复秒杀
|
创建订单 & 秒杀订单 & 写Redis标记
|
---异步处理结束---
|
客户端开始定时轮询 /result 接口(GET)
|
ISeckillOrderService.getResult(user, goodsId)
|
Redis中判断是否库存为空 or 查询订单记录
|
返回三种状态:
✔️ 订单ID:成功
❌ -1:失败(库存为空)
⏳ 0:排队中(异步线程尚未处理完)
2. /result
轮询接口逻辑详解
你提供的 /result
Controller:
@RequestMapping(value = "/result", method = RequestMethod.GET)
@ResponseBody
public RespBean getResult(User user, Long goodsId) {
if (user == null) {
return RespBean.error(RespBeanEnum.SESSION_ERROR);
}
Long orderId = seckillOrderService.getResult(user, goodsId);
return RespBean.success(orderId);
}
3. 对应的 getResult()
方法逻辑:
@Override
public Long getResult(User user, Long goodsId) {
// 1. 从数据库中查询是否已经生成秒杀订单
SeckillOrder seckillOrder = seckillOrderMapper.selectOne(
new QueryWrapper<SeckillOrder>().eq("user_id", user.getId()).eq("goods_id", goodsId)
);
if (seckillOrder != null) {
return seckillOrder.getId(); // 秒杀成功,返回订单ID
}
// 2. 如果Redis中标记了库存为空,说明秒杀失败
if (redisTemplate.hasKey("isStockEmpty:" + goodsId)) {
return -1L; // 秒杀失败
}
// 3. 否则仍在排队中
return 0L;
}
4. 为什么要这样设计?
- 异步处理下单:减少数据库压力,防止并发写入造成阻塞。
- 轮询查询结果:前端不断请求
/result
来获得是否秒杀成功。 - Redis做标记:
isStockEmpty:goodsId
:快速失败标记,防止浪费时间排队。order:userId:goodsId
:避免重复秒杀,保证幂等性。
- 高并发友好:因为下单操作是异步的,客户端轮询不会阻塞主线程,也减少数据库压力。
五、Redis分布式锁的原子性操作
缘起:
上面代码实际演示会发现Redis的库存有问题,原因在于Redis没有做到原子性。
1. 问题场景:高并发秒杀下的“扣库存”
假设你在做一个“秒杀”活动,商品库存是 10
,使用 Redis 存储库存数量:
set stock 10
每当一个用户下单时,就会执行如下操作:
int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
if (stock > 0) {
redisTemplate.opsForValue().set("stock", stock - 1);
}
2. 核心问题:这不是一个原子操作
上面代码是三个步骤:
- 读取库存:
stock = 10
- 判断是否大于0
- 更新库存:
stock = 9
多线程并发问题示意:
假设两个线程 A 和 B 几乎同时执行:
时间顺序 | 线程 A | 线程 B |
---|---|---|
T1 | 读取 stock=10 | |
T2 | 读取 stock=10 | |
T3 | 判断 >0 | 判断 >0 |
T4 | 写入 stock=9 | 写入 stock=9 |
结果:
虽然来了两个用户,正确的逻辑应该库存变为 8
,但实际却被覆盖成了 9
,相当于少扣了一次库存(出现超卖/重复卖的问题)。
3. 根本原因:Redis 这些操作不是“原子性的”
Redis 单个命令是原子性的,但你把多个命令组合起来执行时(如 get
+ if
+ set
)就不是原子操作了。
也就是说:
多个命令之间线程是可以插队的,这就导致了并发安全问题。
4. 解决思路:加锁(分布式锁)
为了解决这个并发问题,我们引入锁机制:
if (get lock成功) {
// 执行:get stock -> check -> set stock
// 释放锁
}
效果:
- 同一时间只有一个线程能进来执行扣库存
- 其他线程只能等或者返回“库存紧张,请稍后再试”
5. 再进一步:为什么普通锁也不够,还要加唯一值 + Lua 脚本?
因为如下问题:
- 如果执行慢,锁自动过期,其他线程进来了,但旧线程还在执行
- 如果释放锁不判断是否自己加的,可能误删别人的锁
所以最终需要用:
- 唯一标识(UUID)绑定线程
- Lua 脚本保证删除锁是原子操作
第一步:最基础的锁实现
Boolean isLock = valueOperations.setIfAbsent("k1", "v1");
目的:
- 使用 Redis 的
SETNX
(set if not exists)机制实现分布式锁。 - 如果返回
true
,就认为加锁成功,进入临界区,操作完后手动del
删除锁。
存在的问题:
- 没有过期时间,如果线程意外挂掉(如异常、宕机)就会造成死锁。
- 没有考虑并发线程之间的唯一标识,锁可能会被误删。
第二步:加上过期时间(自动释放锁)
Boolean isLock = valueOperations.setIfAbsent("k1", "v1", 5, TimeUnit.SECONDS);
改进点:
- 给锁设置了5秒过期时间,防止程序异常导致锁无法释放。
- 这个使用了
RedisTemplate.setIfAbsent(K key, V value, long timeout, TimeUnit unit)
方法。
新的问题:
- 如果业务处理时间超过5秒,锁就会提前过期被释放,导致下一个线程以为可以进来,造成多个线程并发执行临界区代码,违背加锁初衷。
第三步:引入唯一标识防止误删(UUID)
String uuid = UUID.randomUUID().toString();
Boolean isLock = redisTemplate.opsForValue().setIfAbsent("k1", uuid, 5, TimeUnit.SECONDS);
...
// 释放锁前先判断value
String value = (String) redisTemplate.opsForValue().get("k1");
if (uuid.equals(value)) {
redisTemplate.delete("k1");
}
改进点:
- 给每个线程生成一个 唯一标识(UUID),只允许加锁的线程自己释放锁。
- 解决了“线程A释放线程B锁”的问题。
新的问题:
get
+delete
是两个独立操作,中间可能有线程切换,仍然有并发安全问题,无法保证原子性!
第四步:用 Lua 脚本原子释放锁
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
redisTemplate.execute(redisScript, Collections.singletonList("k1"), uuid);
改进点:
- 使用 Lua 脚本将“判断 + 删除”打包成一个 Redis 原子操作。
- Redis 保证 Lua 脚本执行期间不会有其他命令插入,彻底解决误删问题。
- 实现了真正意义上的线程安全和可靠释放锁。
总结
阶段 | 代码关键点 | 解决问题 | 遗留问题 |
---|---|---|---|
1️⃣ 初始锁 setIfAbsent(k, v) | 实现基本分布式锁 | 没有过期时间,可能死锁 | |
2️⃣ 加过期时间 setIfAbsent(k, v, timeout) | 防止死锁 | 业务执行慢时锁可能提前释放 | |
3️⃣ 加唯一值(UUID) + get + delete | 防止误删他人锁 | get+delete 非原子 | |
4️⃣ Lua 脚本判断+删除 | 完整原子释放锁,最终完善版本 | 基础 Redis 实现,后续可封装成工具 |