异步优化秒杀业务
回顾之前的内容黑马点评 秒杀优惠券集群下一人一单超卖问题-CSDN博客,为了处理并发情况下的线程安全和数据一致性的问题,我们已经完成了查询优惠券信息、判断秒杀是否开始和结束、检查库存、用户ID加锁、创建订单和扣减库存。
尽管之前的代码已经解决了基础问题,但在高并发情景下仍有瓶颈,接下来就需要对性能和稳定性做进一步的提升。所以接下来将使用异步优化秒杀业务。
为什么需要异步优化
1. 同步阻塞导致吞吐量低
-
问题:用户请求需等待所有数据库操作完成(如订单写入、库存扣减),线程长时间阻塞,系统吞吐量受限。
-
优化目标:
-
立即响应:用户请求快速返回,耗时操作异步执行。
-
提升吞吐量:将数据库压力转移至后台,Tomcat 线程快速释放,可处理更多请求。
-
2. 数据库成为性能瓶颈
-
问题:每次请求直接操作数据库,高并发时连接池耗尽、慢查询堆积。
-
优化目标:
-
削峰填谷:通过消息队列缓存请求,平滑流量高峰。
-
异步写库:批量处理订单,减少事务提交次数,降低数据库负载。
-
3. 用户体验差
-
问题:用户需等待秒杀所有逻辑完成(通常200ms~1s),高并发时易超时或失败。
-
优化目标:
-
快速反馈:用户点击后立即返回“请求已受理”,订单状态后续异步通知(如短信或站内信)。
-
补充:
同步:任务按顺序执行,必须等待前一个任务完成后,才能开始下一个任务。
问题:
所有操作都在同一个线程中完成,用户必须等待所有步骤结束才能得到响应。
高并发时,数据库压力大,响应时间变长,用户体验差。
异步:任务提交后立即返回,后续操作由其他线程或服务在后台处理。
优势:
用户无需等待数据库操作完成,响应速度极快(毫秒级)。
数据库压力被削峰填谷,避免高并发直接冲击。
优化任务如下:
阻塞队列Java BlockingQueue
1. 定义:阻塞队列是一种线程安全的数据结构,能平衡生产者与消费者的处理速度,避免资源耗尽或数据丢失。它支持以下操作:
- 入队(Put):队列满时,生产者线程会被阻塞,直到队列有空位。
- 出队(Take):队列空时,消费者线程会被阻塞,直到队列有新数据。
2. 在秒杀业务中的使用:
- 生产者:Tomcat 服务,快速校验后将订单任务写入队列。
- 消费者:后台线程或服务,从队列中取出任务并处理(扣库存、写订单)。
seckill.lua:
-- 1.参数列表
-- 优惠券id
local voucherId = ARGV[1]
-- 用户id
local userId = ARGV[2]
-- 2.数据key
-- 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务逻辑
-- 判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 库存不足,返回1
return 1
end
-- 判断用户是否已下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 用户重复下单,返回2
return 2
end
-- 扣减库存,并判断扣减是否成功 INCRBY stockKey -1
redis.call("incrby", stockKey, -1)
-- 下单,保存用户id到集合 SADD orderKey userId
redis.call("sadd", orderKey, userId)
return 0
改造后的VoucherServiceImpl.java:
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
/**
* 加载Lua脚本:执行脚本保证原子性操作
*/
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
//静态初始化块
static {
SECKILL_SCRIPT = new DefaultRedisScript<>(); //创建实例
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); //设置脚本文件位置
SECKILL_SCRIPT.setResultType(Long.class); //设置返回类型为Long
}
/**
* 创建阻塞队列:异步下单,提高效率
*/
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
/**
* 创建单线程线程池
*/
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
/**
* 在VoucherServiceImpl类初始化时执行线程池
*/
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
/**
* 线程池异步执行优惠券订单任务
*/
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
//1.获取阻塞队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
//2.创建订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("处理订单异常", e);
}
}
}
}
/**
* 创建订单
*
* @param voucherOrder
*/
private void handleVoucherOrder(VoucherOrder voucherOrder) {
//1.获取用户id,这里是多线程不能在ThreadLocal中直接获得当前用户id
Long userId = voucherOrder.getUserId();
//2.创建锁对象(锁键设计:"lock:order:" + userId 以用户ID为粒度,不同用户请求可并行)
RLock lock = redissonClient.getLock(LOCK_ORDER_KEY + userId);
//3.获取锁(无参表示只尝试获取锁一次)
boolean isLock = lock.tryLock();
//4.判断获取锁是否成功
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
log.error("不允许重复下单");
return;
}
try {
//获取锁成功,创建订单
proxy.createVoucherOrder(voucherOrder);
} finally {
//释放锁
lock.unlock();
}
}
private IVoucherOrderService proxy;
/**
* 抢购秒杀优惠券
*
* @param voucherId
* @return
*/
public Result seckillVoucher(Long voucherId) {
//获取用户id
Long userId = UserHolder.getUser().getId();
//1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString());
//2.判断结果是否为0
//2.1不为0,代表没有抢购资格
// result为1表示库存不足,result为2表示用户已下单
int r = result.intValue();
if (r != 0) {
switch (r) {
case 1:
return Result.fail("库存不足");
case 2:
return Result.fail("不能重复下单");
/*case 3:
return Result.fail("活动结束");*/
}
}
//2.2 result为0,用户具有秒杀资格,将订单信息(订单id,优惠券id,用户id)保存到阻塞队列中,实现异步下单
//3.创建订单(在订单表tb_voucher_order插入一条数据)
VoucherOrder voucherOrder = new VoucherOrder();
//3.1 订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//3.2 用户id
voucherOrder.setUserId(userId);
//3.3 代金券id
voucherOrder.setVoucherId(voucherId);
// 3.4放入阻塞队列
orderTasks.add(voucherOrder);
//4.获取代理对象,使用代理对象调用第三方事务方法,防止事务失效
proxy = (IVoucherOrderService) AopContext.currentProxy(); //获取当前类的代理对象
//5.返回订单id
return Result.ok(orderId);
}
/**
* 通过数据库查询确保“一人一单”
*
* @param voucherOrder
*/
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
//5.一人一单
Long userId = voucherOrder.getUserId();
//5.1查询数据库中是否已经存在该用户抢购该优惠券的订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
//5.2判断是否存在
if (count > 0) {
//用户已经购买过了,返回失败信息
log.error("用户已购买!");
return;
}
//6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") //set stock = stock - 1
.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) //where id = ? and stock > 0 数据库层面的乐观锁,避免超卖
.update();
if (!success) {
//库存扣减失败
log.error("库存不足!");
return;
}
//7.创建订单(在订单表tb_voucher_order插入一条数据)
//插入到订单信息表
save(voucherOrder);
}
}
使用阻塞队列存在的问题
1. 内存限制与数据丢失风险
-
内存溢出(OOM):
阻塞队列存储在JVM内存中,当瞬时流量极大(如百万级请求),队列可能迅速占满内存,导致服务崩溃。 -
数据丢失:
若服务宕机或重启,内存中的队列数据全部丢失,未处理的订单请求无法恢复。
2. 单机瓶颈
-
无法水平扩展:
阻塞队列基于单机内存,无法跨多台服务器共享。当单机处理能力不足时,无法通过增加节点扩展吞吐量。 -
消费者能力受限:
消费者线程池受限于单机CPU和内存资源,无法灵活应对流量高峰。
3. 可靠性不足
-
无持久化机制:
消息仅存在于内存,缺乏磁盘持久化,无法应对服务崩溃或断电。 -
缺乏重试与死信处理:
若消息处理失败(如数据库异常),无法自动重试或转移至死信队列,需手动实现补偿逻辑。
消息队列优化
消息队列是一种在分布式系统中用于在不同组件或服务之间传递消息的中间件。其核心机制是生产者将消息发送到队列,消费者从队列中取出消息进行处理。这种模式实现了系统间的异步通信和解耦。
Redis消息队列
Redis不仅是一个高性能的内存数据库,还提供了多种数据结构(如List、Pub/Sub、Stream)来实现轻量级的消息队列。
1. 基于List的简单队列
- 核心命令:
- LPUSH / RPUSH:生产者向列表左/右侧插入消息。
- BRPOP / BLPOP:消费者阻塞式从列表右/左侧取出消息。
- 特点:
- 简单高效,支持阻塞等待。
- 消息只能被消费一次,无确认机制(消息取出后即删除,若处理失败会丢失)。
- 示例:
# 生产者发送消息
LPUSH my_queue "message1"
# 消费者阻塞等待消息(超时时间10秒)
BRPOP my_queue 10
2. 发布订阅模式(Pub/Sub)
- 核心命令:
- PUBLISH:发布消息到频道。
- SUBSCRIBE:订阅频道接收消息。
- 特点:
- 支持广播,实时性高。
- 消息无持久化(订阅者离线时丢失消息),无法回溯历史消息。
- 示例:
# 订阅者订阅频道
SUBSCRIBE my_channel
# 生产者发布消息
PUBLISH my_channel "Hello, World!"
3. 基于Streams的可靠队列(推荐)
Redis 5.0+ 支持 Streams 数据类型,提供更完整的消息队列功能:
消息持久化:消息存储在 Stream 中,可重复读取。
消费者组(Consumer Group):支持多消费者协同处理,消息确认机制(ACK)。
Pending Entries List (PEL):跟踪处理中的消息,避免丢失。
- 核心命令:
- XADD:添加消息到 Stream。
- XREADGROUP:消费者组读取消息。
- XACK:确认消息处理完成。
- 示例:
# 创建消息流
XADD mystream * field1 "value1" field2 "value2"
# 创建消费者组
XGROUP CREATE mystream mygroup 0
# 消费者从组内读取消息(阻塞模式)
XREADGROUP GROUP mygroup consumer1 BLOCK 1000 COUNT 1 STREAMS mystream >
# 处理完成后确认消息
XACK mystream mygroup 1629164893000-0
3.1 单消费者模式(Single Consumer)
- 定义:单个消费者直接从一个 Stream 中读取消息,每条消息只能被消费一次。
- 特点:
- 独立消费:消费者通过
XREAD
命令主动拉取消息。 - 无状态管理:需手动记录已处理消息的ID,避免重复消费。
- 简单轻量:适合单实例、低并发的场景。
- 独立消费:消费者通过
- 潜在问题:
- 消息重复消费:若消费者崩塌,重启后需手动记录上次处理的位置(通过LASTID)
- 无负载均衡:无法通过多实例提升处理能力。
- 示例:
# 生产者发送消息到 Stream
XADD mystream * field1 "value1" field2 "value2"
# 消费者读取消息(阻塞式,最多读取2条)
XREAD COUNT 2 BLOCK 5000 STREAMS mystream 0
# 输出:
# 1) 1) "mystream"
# 2) 1) 1) "1678459545000-0"
# 2) 1) "field1" 2) "value1" 3) "field2" 4) "value2"
3.2 消费者组模式(Consumer Group)
- 定义:多个消费者组成一个逻辑组,协同消费同一个 Stream 中的消息。每条消息仅被组内一个消费者处理。
- 核心机制:
- 消息分配:Redis 自动将消息分发给组内不同的消费者。
- 故障转移:若某个消费者宕机,未确认(Pending)的消息会被重新分配给其他消费者。
- 消息确认(ACK):消费者处理完成后需发送ACK,否则消息会重新入队。
- 核心优势:
- 负载均衡:组内多个消费者并行处理消息,提升吞吐量。
- 消息可靠性:通过 ACK 机制和 Pending Entries List(PEL)确保消息必达。
- 自动重试:未确认的消息会被重新分配给其他消费者。
- 示例:
# 创建消费者组(关联到 Stream)
XGROUP CREATE mystream mygroup 0
# 消费者加入组并消费消息(消费者1)
XREADGROUP GROUP mygroup consumer1 COUNT 1 BLOCK 5000 STREAMS mystream >
# 输出:
# 1) 1) "mystream"
# 2) 1) 1) "1678459545000-0"
# 2) 1) "field1" 2) "value1" 3) "field2" 4) "value2"
# 处理完成后发送ACK确认
XACK mystream mygroup 1678459545000-0
# 另一个消费者加入组(消费者2)
XREADGROUP GROUP mygroup consumer2 COUNT 1 BLOCK 5000 STREAMS mystream >
消费者组配置示例
# 创建消费者组(若Stream不存在则自动创建)
XGROUP CREATE mystream mygroup $ MKSTREAM
# 消费者1加入组并消费消息
XREADGROUP GROUP mygroup consumer1 COUNT 10 BLOCK 5000 STREAMS mystream >
# 消费者2加入组并消费消息
XREADGROUP GROUP mygroup consumer2 COUNT 10 BLOCK 5000 STREAMS mystream >
# 查看未确认的消息(Pending List)
XPENDING mystream mygroup
实现业务:
修改代码:
1. 在redis中创建消息队列
# 创建队列(消费者组模式)
XGROUP CREATE stream.orders g1 0 MKSTREAM
2. 修改lua脚本
-- 1.参数列表
-- 优惠券id
local voucherId = ARGV[1]
-- 用户id
local userId = ARGV[2]
-- 订单id
local orderId = ARGV[3]
-- 2.数据key
-- 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务逻辑
-- 判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 库存不足,返回1
return 1
end
-- 判断用户是否已下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 用户重复下单,返回2
return 2
end
-- 扣减库存,并判断扣减是否成功 INCRBY stockKey -1
redis.call("incrby", stockKey, -1)
-- 下单,保存用户id到集合 SADD orderKey userId
redis.call("sadd", orderKey, userId)
-- 发送消息到队列中 XADD stream.orders * k1 v1 k2 v2...
redis.call("xadd", "stream.orders", "*", "userId", userId, "voucherId", voucherId, "id", orderId)
return 0
3.修改业务代码,实现消息队列异步优化
/**
* 优惠券订单任务处理线程:不断从消息队列中获取消息,并创建订单
*/
private class VoucherOrderHandler implements Runnable {
/**
* 队列名称
*/
String queueName = "stream.orders";
@Override
public void run() {
while (true) {
try {
//1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >(‘>’代表未消费)
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
//2.判断消息获取是否成功
if (list == null || list.isEmpty()) {
//2.1如果获取失败,说明没有消息,继续下一次循环获取消息
continue;
}
//3.如果获取成功,解析消息中的订单信息
//将消息转成VoucherOrder对象
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//3.1 创建订单完成下单
handleVoucherOrder(voucherOrder);
//4.ACK确认 SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
//处理异常消息
log.error("处理订单异常", e);
handlePendingList();
}
}
}
/**
* 处理pending-list中的消息
*/
private void handlePendingList() {
while (true) {
try {
//1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.orders 0('0'代表已消费但未确认)
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
//2.判断消息获取是否成功
if (list == null || list.isEmpty()) {
//2.1如果获取失败,说明pending-list中没有异常消息,直接结束循环
break;
}
//3.如果获取成功,解析消息中的订单信息
//将消息转成VoucherOrder对象
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//3.1 创建订单完成下单
handleVoucherOrder(voucherOrder);
//4.ACK确认 SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
//处理异常消息
log.error("处理pending-list中的订单信息失败,{}", e.getMessage());
try {
Thread.sleep(20);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
}