【Redis】使用阻塞队列+Redis优化秒杀业务
文章目录
- 【Redis】使用阻塞队列+Redis优化秒杀业务
- 1. 为什么要优化
- 2. 怎么优化
- 2.1 查询优惠卷
- 2.2 判断秒杀库存
- 2.3 校验一人一单
- 2.4 减库存
- 2.5 创建订单
- 2.6 保证redis操作的原子性
- 3. 确认优化方案
- 4. 实现优化方案
- 4.1 编写lua脚本
- 4.2 定义阻塞队列和线程池
- 4.3 定义内部类实现 `Runnable` 接口处理阻塞队列
- 4.4 定义方法执行下单任务
- 4.5 主方法实现
- 5. 怎样才算优化成功
在对业务进行优化之前,我们需要了解以下几点:
- 为什么要优化
- 怎么优化
- 怎么才算优化成功
我们下面也围绕这几点来讲述。
1. 为什么要优化
假设一个场景:
一个电商平台,商家推出热门产品的限量优惠券,一人只能下一单。
最简单的业务流程如下:
首先查询优惠券再判断秒杀库存然后查询订单…一步接着一步,整个业务的响应时间就是每步操作所花时间之和,我们将这种形式称为 同步
。而且基本每个操作都要查询数据库。我们也知道查询数据库的时间不算快并且当并发量比较大时对数据库也不友好。所以我们需要对其进行优化。
2. 怎么优化
怎样优化原来的业务,我认为首先就得画出原来业务的流程图,根据流程图具体分析哪一步可以进行优化。我们的流程图已经在上面给出。
2.1 查询优惠卷
这里的优惠券查询直接查询了数据库,我们直到数据库的查询效率是不如redis的,所以我们可以将优惠券的信息添加到redis中(在发布优惠券的时候就要添加到redis中),每次都查询redis的数据,这样就提高了查询效率。
2.2 判断秒杀库存
和查询优惠券一样,我们已经已经说了要将优惠券信息添加到redis中,那么我们就可以根据从redis中查询出的数据进行秒杀判断。那么问题来了,我们添加到redis的优惠券信息到底是啥?是一整个优惠券对象吗?不是,而是优惠券的库存数量。至于key的值,完全根据实际业务决定。下面给出redis存储优惠券的示例格式:
2.3 校验一人一单
校验一人一单操作的优化不要我说也知道该干嘛了吧,没错!放到redis中查询,那么我们把什么数据放到redis中存储呢?**这里是一个重点!**选择的一个合适的存储格式及其重要。因为我们要求一人只能下一单,我们应该能够想到Set集合,set集合的特性不用我多说。set集合中存储下单用户的id,我们就以set集合的形式存入redis中,key可以由优惠卷的id组成,存储格式如下所示:
2.4 减库存
校验一人一单发现该用户没下过单,我们就可以去减库存,减库存的操作我们也放到redis中执行,我们可以使用redis自增的操作来实现扣减库存的操作。
2.5 创建订单
我们原来的业务流程是 同步
的,我们可以将它改造成 异步
的,这样就能够大大节省时间。而创建订单的操作正适合改造成异步操作。我们将订单对象放入一个阻塞队列中,让独立线程去处理阻塞队列中的订单对象。
2.6 保证redis操作的原子性
2.1至2.5的操作都是分别对redis进行操作,在并发的情况下万一某一步的redis操作因为某些原因阻塞了,容易出现线程安全问题。为避免线程安全问题的发生,我们应该确保redis操作的原子性。这里可以选择使用lua脚本来却本redis操作的原子性。
3. 确认优化方案
在上面我们已经分析了各个流程当中可优化的点,那么我们接下来就可以根据上面的分析来重新设计业务流程图,优化之后的业务流程图如下:
4. 实现优化方案
4.1 编写lua脚本
根据给出的lua脚本流程图编写lua脚本:
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if (tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if (redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0
4.2 定义阻塞队列和线程池
//阻塞队列
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
//线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
4.3 定义内部类实现 Runnable
接口处理阻塞队列
定义一个内部类,编写“下单任务”去执行创建订单的操作。线程池中的独立线程去执行这个任务,从阻塞队列中取出订单,然后创建订单。
private class VoucherOrderHndler implements Runnable {
@Override
public void run() {
while (true) {
try {
//1.获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
//2.创建订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("处理订单异常", e);
}
}
}
}
handleVoucherOrder(voucherOrder)
方法定义在外部类上,具体业务如下:
虽然我们使用lua脚本已经确保了一人一单,但是我们还可以使用 redisson
去加锁进行兜底。然后我们使用代理对象去调用创建订单的方法。
但是我们这里使用线程池中的独立线程去处理订单,该线程和我们主方法中使用的线程不是同一个,所以在该方法中我们通过 IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
我们无法获得主方法中的代理对象,所以该方法中的 proxy
对象,是我们在外部类中通过成员变量声明并在主方法中赋值的,具体操作继续往下看。
private void handleVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
RLock lock = redissonClient.getLock("lock:order:" + userId);
if (!lock.tryLock()) {
//获取锁失败
log.error("不允许重复下单");
}
try {
//获取代理对象(事务)
proxy.createVoucherOrder(voucherOrder);
} finally {
//释放锁
lock.unlock();
}
}
createVoucherOrder(voucherOrder)
的方法如下所示:
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
//4.一人一单
Long userId = voucherOrder.getUserId();
//4.1.查询是否已经有订单
int count = lambdaQuery().eq(VoucherOrder::getUserId, userId).eq(VoucherOrder::getVoucherId, voucherOrder.getVoucherId()).count();
if (count > 0) {
//已有订单
log.error("用户已经购买过一次!");
return;
}
//5.扣减库存
//5.1.写法一
LambdaUpdateWrapper<SeckillVoucher> updateWrapper = new LambdaUpdateWrapper<>();
updateWrapper.setSql("stock=stock-1").eq(SeckillVoucher::getVoucherId, voucherOrder.getVoucherId()).gt(SeckillVoucher::getStock, 0);
boolean success = seckillVoucherService.update(updateWrapper);
if (!success) {
//扣减失败
log.error("库存不足");
return;
}
//创建订单
save(voucherOrder);
}
4.4 定义方法执行下单任务
我们要实现上面定义的下单任务在外部类刚初始化完成就执行。我们需要使用到 @PostConstruct
注解,该注解的功能是在类初始化后就执行。
//这个类刚初始化后就去执行这个任务
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHndler());
}
4.5 主方法实现
seckillVoucher(Long voucherId)
就是真正处理秒杀业务的方法。
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setResultType(Long.class);
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
}
private IVoucherOrderService proxy;
@Override
public Result seckillVoucher(Long voucherId) {
//获取用户id
Long userId = UserHolder.getUser().getId();
//1.执行lua脚本
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString());
//2.判断结果是否为0
int r = result.intValue();
if (r != 0) {
//2.不为0,没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重读下单");
}
//2.2 为0,有购买资格,把下单信息保存到阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
//2.3 订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherId);
//2.4放入阻塞队列
orderTasks.add(voucherOrder);
//3.获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(orderId);
}
5. 怎样才算优化成功
判断优化是否成功的一个最为直观的指标就是响应时间,我们可以通过jMeter工具去进行并发测试,得到优化后的聚合报告,并与优化之前的聚合报告相比较。如果优化后的聚合报告中各指标基本都优于优化前的指标,那么就算优化成功了。