大众点评项目 阻塞队列+异步处理 实现秒杀优化
- 需求:阻塞队列+异步处理 实现秒杀优化
- 为什么使用异步处理?
- 为什么使用阻塞队列?
- 为什么使用Lua?
- 业务逻辑及其实现
- 原有逻辑代码 / 优化后逻辑代码
- 完整优化业务代码
- 原有优化业务代码
- 总结
SpringCloud章节复习已经过去,新的章节Redis开始了,这个章节中将会回顾Redis实战项目 大众点评
主要依照以下几个原则
- 基础+实战的Demo和Coding上传到我的代码仓库
- 在原有基础上加入一些设计模式,stream+lamdba等新的糖
- 通过DeBug调试,进入组件源码去分析底层运行的规则和设计模式
代码会同步在我的gitee中去,觉得不错的同学记得一键三连求关注,感谢:
Redis优化-链接: RedisBlockQueueMethodProject
需求:阻塞队列+异步处理 实现秒杀优化
我们通过@PostConstruct开启线程池,一旦系统开启,直接进行处理,当有订单添加到阻塞队列,就可以异步处理响应,
首先,用户接收到 是否成功的信息;后面,数据库的操作将从子线程中得到执行
为什么使用异步处理?
请求异步处理,对用户更友好,响应更方便,可以定制流程化我们的操作;
这里 我们开启子线程处理了 数据的CRUD, 不需要得到返回, 从而使压测数据更好看
为什么使用阻塞队列?
- 一般的队列只能保证作为一个有限长度的缓冲区,如果超出了缓冲长度,就无法保留当前的任务了,阻塞队列通过阻塞可以保留住当前想要继续入队的任务。
- 阻塞队列可以保证任务队列中没有任务时阻塞获取任务的线程,使得线程进入wait状态,释放cpu资源。
- 阻塞队列自带阻塞和唤醒功能,不需要做额外处理,无任务执行时,线程池利用阻塞队列的take方法挂起,从而维持核心线程的存活,不至于一直占用CPU资源。
为什么使用Lua?
- Lua脚本在Redis中是原子执行的,执行过程中间不会插入其他命令
- Lua脚本可以帮助开发和运维人员创造出自己定制的命令,并可以将这些命令常驻在Redis内存中,实现复用的效果
- Lua脚本可以将多条命令一次性打包,有效地减少网络开销
总结下,就是Redis本身的事务不符合我们的要求,在很多场景下, 原生的指令并不友好;
Redis本身提供了multi关键字用来开启事务,exec用来关闭事务。
业务逻辑及其实现
业务需求:
我们想要每个 秒杀商品 的用户ID和 库存数量放到Redis,
这里每次查找就可以通过Redis直接进行处理,而非DB,这样是很友好的,能极大提升吞吐量
具体实现:
我们通过@PostConstruct开启线程池,一旦系统开启,直接进行处理,当有订单添加到阻塞队列,就可以异步处理响应,
首先,用户接收到 是否成功的信息;后面,数据库的操作将从子线程中得到执行
- 先将商品的库存保存到Redis中去
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY+ voucher.getId(),
voucher.getStock().toString() );
}
- 通过Lua实现Redis事务,将原有的库存判断,打包成Lua脚本,处理更方便
-- 使用lua是为了实现redis的事务,实现可重用性
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]
-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
- 调用Lua脚本服务
private static DefaultRedisScript<Long> SECKILL_SCRIPT;
static{
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
- 判断Lua的返回值,进一步处理 用户 服务
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString()
);
int r = result.intValue();
if (r!=0){
return Result.fail(r==1?"库存不足":"不能重复下单");
}
}
- 开始进行异步处理
- 阻塞队列处理
- 封装订单
@Override
public Result seckillVoucher(Long voucherId) {
//阻塞队列
//封装订单
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIDProductor.nextId("order");
voucherOrder.setId(orderId).setUserId(userId).setVoucherId(voucherId);
// save(voucherOrder);
currentProxy = (IVoucherOrderService) AopContext.currentProxy();
//异步执行减库存
orderTask.add(voucherOrder);
return Result.ok(orderId);
}
- (1)阻塞队列处理
现在 执行orderTask.add(voucherOrder);
通过线程池ExecutorService
来提交任务,进行处理
这里的@PostConstruct
是让Bean一初始化就执行,BlockingQueue
无任务,就会阻塞,使得线程进入wait状态,释放cpu资源。
无任务执行时,线程池利用阻塞队列的take方法挂起,从而维持核心线程的存活,不至于一直占用CPU资源。
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
private BlockingQueue<VoucherOrder> orderTask = new ArrayBlockingQueue<>(1024*1024);
@PostConstruct
private void init() {
log.debug("SECKILL_ORDER_EXECUTOR: work...");
//通过线程池`ExecutorService `来提交任务,进行处理
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
//异步处理阻塞队列中的任务
private class VoucherOrderHandler implements Runnable{
@Override
public void run() {
while(true){
try {
//无任务执行时,线程池利用阻塞队列的take方法挂起,从而维持核心线程的存活,不至于一直占用CPU资源。
VoucherOrder voucherOrder = orderTask.take();
handlerVoucherOrder(voucherOrder);
} catch (Exception e) {
// e.printStackTrace();
log.error("订单异常" + e);
}
}
}
}
- (2)异步处理CRUD操作
//需要在主线程中获得实际代理
private IVoucherOrderService currentProxy;
private void handlerVoucherOrder(VoucherOrder voucherOrder){
log.debug("voucherOrder: work..." + voucherOrder.toString());
Long userId = voucherOrder.getUserId();
//这里还有个分布式锁Redission,后面实现 暂时用Sy来代替
//intern()是去常量池中去找userId,处理锁失效,事务失效
synchronized(userId.toString().intern()) {
try {
/**
* 这里不能IVoucherOrderService currentProxy = (IVoucherOrderService) AopContext.currentProxy();
* 原因 private static final ThreadLocal<Object> currentProxy = new NamedThreadLocal<>("Current AOP proxy");
* 子线程无法从ThreadLocal中拿到想要的数据的
*/
currentProxy.createVoucherOrder(voucherOrder);
} catch (Exception e) {
e.printStackTrace();
}
//调用的this.create方法,没有实际对象,是事务失效的几种情况之一,所以需要找代理对象来实现
}
}
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
log.debug("createVoucherOrder: work..." + voucherOrder.toString());
/**
* 实现一人一单 Long userId = UserHolder.getUser().getId();
* //由于是异步,createVoucherOrder是通过Proxy进行调用的,
* 子线程无法从ThreadLocal中拿到想要的数据的,所以必须通过Order来获得
*/
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
log.error("用户已经购买过一次");
return ;
}
//验证结束,扣减库存
boolean flag = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0)//CAS: 通过设定判断库存数量来进行,适合更新数据使用
.update();
if (!flag) {
log.error("库存不足");
return ;
}
save(voucherOrder);
}
}
- 执行命令操作
查看缓存
查看数据库
原有逻辑代码 / 优化后逻辑代码
完整优化业务代码
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIDProductor redisIDProductor;
@Resource
private StringRedisTemplate stringRedisTemplate;
private BlockingQueue<VoucherOrder> orderTask = new ArrayBlockingQueue<>(1024*1024);
private static DefaultRedisScript<Long> SECKILL_SCRIPT;
static{
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
//需要在主线程中获得实际代理
private IVoucherOrderService currentProxy;
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct
private void init() {
log.debug("SECKILL_ORDER_EXECUTOR: work...");
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
//异步处理阻塞队列中的任务
private class VoucherOrderHandler implements Runnable{
@Override
public void run() {
while(true){
try {
VoucherOrder voucherOrder = orderTask.take();
handlerVoucherOrder(voucherOrder);
} catch (Exception e) {
// e.printStackTrace();
log.error("订单异常" + e);
}
}
}
}
private void handlerVoucherOrder(VoucherOrder voucherOrder){
log.debug("voucherOrder: work..." + voucherOrder.toString());
Long userId = voucherOrder.getUserId();
//这里还有个分布式锁Redission,后面实现 暂时用Sy来代替
//intern()是去常量池中去找userId,处理锁失效,事务失效
synchronized(userId.toString().intern()) {
try {
/**
* 这里不能IVoucherOrderService currentProxy = (IVoucherOrderService) AopContext.currentProxy();
* 原因 private static final ThreadLocal<Object> currentProxy = new NamedThreadLocal<>("Current AOP proxy");
* 子线程无法从ThreadLocal中拿到想要的数据的
*/
currentProxy.createVoucherOrder(voucherOrder);
} catch (Exception e) {
e.printStackTrace();
}
//调用的this.create方法,没有实际对象,是事务失效的几种情况之一,所以需要找代理对象来实现
}
}
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString()
);
int r = result.intValue();
if (r!=0){
return Result.fail(r==1?"库存不足":"不能重复下单");
}
//阻塞队列
//封装订单
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIDProductor.nextId("order");
voucherOrder.setId(orderId).setUserId(userId).setVoucherId(voucherId);
// save(voucherOrder);
currentProxy = (IVoucherOrderService) AopContext.currentProxy();
//异步执行减库存
orderTask.add(voucherOrder);
return Result.ok(orderId);
}
//异步开启事务
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
log.debug("createVoucherOrder: work..." + voucherOrder.toString());
/**
* 实现一人一单 Long userId = UserHolder.getUser().getId();
* //由于是异步,createVoucherOrder是通过Proxy进行调用的,
* 子线程无法从ThreadLocal中拿到想要的数据的,所以必须通过Order来获得
*/
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
log.error("用户已经购买过一次");
return ;
}
//验证结束,扣减库存
boolean flag = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0)//CAS: 通过设定判断库存数量来进行,适合更新数据使用
.update();
if (!flag) {
log.error("库存不足");
return ;
}
save(voucherOrder);
}
}
原有优化业务代码
/**
* 这里是通过乐观锁/悲观锁实现的一人一单,秒杀功能
* @param voucherId
* @return
*/
public Result seckillVoucher1(Long voucherId) {
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始");
}
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀已经结束");
}
if (voucher.getStock() < 1) {
return Result.fail("库存不足");
}
//实现一人一单
Long userId = UserHolder.getUser().getId();
//intern()是去常量池中去找userId,处理锁失效,事务失效
synchronized(userId.toString().intern()) {
IVoucherOrderService currentProxy = (IVoucherOrderService) AopContext.currentProxy();
return currentProxy.createVoucherOrder(voucherId);
*//*调用的this.create方法,没有实际对象,是事务失效的几种情况之一,所以需要找代理对象来实现
return createVoucherOrder(voucherId);*//*
}
}
@Transactional
public Result createVoucherOrder1(Long voucherId) {
//实现一人一单
Long userId = UserHolder.getUser().getId();
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
return Result.fail("已经购买");
}
//验证结束,扣减库存
boolean flag = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0)//CAS: 通过设定判断库存数量来进行,适合更新数据使用
.update();
if (!flag) {
return Result.fail("库存不足");
}
//封装订单
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIDProductor.nextId("order");
//订单Id、用户Id、优惠券Id
voucherOrder.setId(orderId).setUserId(userId).setVoucherId(voucherId);
save(voucherOrder);
return Result.ok(orderId);
}
总结