异步领券
优化方案分析
对于高并发问题,优化的思路有异步写和合并写。
其中,合并写请求比较适合应用在写频率较高,写数据比较简单的场景。而异步写则更适合应用在业务比较复杂,业务链较长的场景。
显然,领券业务更适合使用异步写方案。
思路分析与设计
不过这里存在一个问题:
并不是每一个用户都有领券资格,具体要校验了资格才知道。那我们在发送MQ消息后,就要返回给用户结果了,此时该告诉用户是领券成功还是失败呢?
显然,无论告诉他哪种结果都不一定正确。因此,我们应该将校验领券资格的逻辑前置,在校验完成后再发MQ消息,完成数据库写操作:
方案进一步改进:
但是,校验领券资格的部分依然会有多次数据库查询,还需要加锁。效率提升并不明显,怎么办?
为了进一步提高效率,我们可以把优惠券相关数据缓存到Redis中,这样就可以基于Redis完成资格校验
优惠券缓存
缓存内容
优惠券资格校验需要校验的内容包括:
-
优惠券发放时间
-
优惠券库存
-
用户限领数量
因此,为了减少对Redis内存的消耗,在构建优惠券缓存的时候,我们并不需要把所有优惠券信息写入缓存,而是只保存上述字段即可。
注意!!!!
既然要在缓存中保存优惠券库存,并且校验库存是否充足。那就必须在每次校验通过后,立刻扣减Redis中缓存的库存,否则缓存中库存一直不变,起不到校验是否超发的目的。
缓存数据结构
为了便于我们修改缓存中的库存数据,这里建议采用Hash结构,将库存作为Hash的一个字段,将来只需要通过HINCRBY
命令即可修改。
Redis中的数据结构大概如图:
KEY(couponId) | field | value |
---|---|---|
couponId:10 | issueBeginTime | 20230327 |
issueEndTime | 20230501 | |
totalNum | 100 | |
userLimit | 1 | |
couponId:20 | issueBeginTime | 20230827 |
issueEndTime | 20230901 | |
totalNum | 200 | |
userLimit | 2 |
上述结构中记录了券的每人限领数量:userLimit , 但是用户已经领取的数量并没有记录
一个券可能被多个用户领取,每个用户的已领取数量都需要记录。显然,还是Hash结构更加适合:
KEY(couponId) | field(userId) | value(count) |
---|---|---|
couponId:10 | uid:110 | 1 |
uid:120 | 1 | |
uid:130 | 1 | |
uid:140 | 1 |
缓存KEY前缀
注意!!!
优惠券的缓存该何时添加呢?
优惠券一旦发放,就可能有用户来领券,因此应该在发放优惠券的同时直接添加优惠券缓存。而暂停发放时则应该将优惠券的缓存删除,下次再次发放时重新添加。
添加缓存
private final StringRedisTemplate redisTemplate;
@Transactional
@Override
public void beginIssue(CouponIssueFormDTO dto) {
// 1.查询优惠券
Coupon coupon = getById(dto.getId());
if (coupon == null) {
throw new BadRequestException("优惠券不存在!");
}
// 2.判断优惠券状态,是否是暂停或待发放
if(coupon.getStatus() != CouponStatus.DRAFT && coupon.getStatus() != PAUSE){
throw new BizIllegalException("优惠券状态错误!");
}
// 3.判断是否是立刻发放
LocalDateTime issueBeginTime = dto.getIssueBeginTime();
LocalDateTime now = LocalDateTime.now();
boolean isBegin = issueBeginTime == null || !issueBeginTime.isAfter(now);
// 4.更新优惠券
// 4.1.拷贝属性到PO
Coupon c = BeanUtils.copyBean(dto, Coupon.class);
// 4.2.更新状态
if (isBegin) {
c.setStatus(ISSUING);
c.setIssueBeginTime(now);
}else{
c.setStatus(UN_ISSUE);
}
// 4.3.写入数据库
updateById(c);
// 5.添加缓存,前提是立刻发放的
if (isBegin) {
coupon.setIssueBeginTime(c.getIssueBeginTime());
coupon.setIssueEndTime(c.getIssueEndTime());
cacheCouponInfo(coupon);
}
// 6.判断是否需要生成兑换码,优惠券类型必须是兑换码,优惠券状态必须是待发放
if(coupon.getObtainWay() == ObtainType.ISSUE && coupon.getStatus() == CouponStatus.DRAFT){
coupon.setIssueEndTime(c.getIssueEndTime());
codeService.asyncGenerateCode(coupon);
}
}
private void cacheCouponInfo(Coupon coupon) {
// 1.组织数据
Map<String, String> map = new HashMap<>(4);
map.put("issueBeginTime", String.valueOf(DateUtils.toEpochMilli(coupon.getIssueBeginTime())));
map.put("issueEndTime", String.valueOf(DateUtils.toEpochMilli(coupon.getIssueEndTime())));
map.put("totalNum", String.valueOf(coupon.getTotalNum()));
map.put("userLimit", String.valueOf(coupon.getUserLimit()));
// 2.写缓存
redisTemplate.opsForHash().putAll(PromotionConstants.COUPON_CACHE_KEY_PREFIX + coupon.getId(), map);
}
移除缓存
@Override
@Transactional
public void pauseIssue(Long id) {
// 1.查询旧优惠券
Coupon coupon = getById(id);
if (coupon == null) {
throw new BadRequestException("优惠券不存在");
}
// 2.当前券状态必须是未开始或进行中
CouponStatus status = coupon.getStatus();
if (status != UN_ISSUE && status != ISSUING) {
// 状态错误,直接结束
return;
}
// 3.更新状态
boolean success = lambdaUpdate()
.set(Coupon::getStatus, PAUSE)
.eq(Coupon::getId, id)
.in(Coupon::getStatus, UN_ISSUE, ISSUING)
.update();
if (!success) {
// 可能是重复更新,结束
log.error("重复暂停优惠券");
}
// 4.删除缓存
redisTemplate.delete(PromotionConstants.COUPON_CACHE_KEY_PREFIX + id);
}
实现异步领券
根据前面的思路分析:
实现异步领券分为两步:
-
改造领券逻辑,实现基于Redis的领取资格校验,然后发送MQ消息
-
编写MQ监听器,监听到消息后执行领券逻辑
定义MQ消息规范
MQ消息通信规范如下:
参数 | 说明 | ||
---|---|---|---|
Exchange | promotion.topic | ||
Routing-Key | coupon:receive | ||
Message | 参数名 | 类型 | 说明 |
userId | Long | 用户id | |
couponId | Long | 优惠券id |
基于Redis的领取资格校验
@Override
@Lock(name = "lock:coupon:#{couponId}")
public void receiveCoupon(Long couponId) {
// 1.查询优惠券
Coupon coupon = queryCouponByCache(couponId);
if (coupon == null) {
throw new BadRequestException("优惠券不存在");
}
// 2.校验发放时间
LocalDateTime now = LocalDateTime.now();
if (now.isBefore(coupon.getIssueBeginTime()) || now.isAfter(coupon.getIssueEndTime())) {
throw new BadRequestException("优惠券发放已经结束或尚未开始");
}
// 3.校验库存
if (coupon.getIssueNum() >= coupon.getTotalNum()) {
throw new BadRequestException("优惠券库存不足");
}
Long userId = UserContext.getUser();
// 4.校验每人限领数量
// 4.1.查询领取数量
String key = PromotionConstants.USER_COUPON_CACHE_KEY_PREFIX + couponId;
Long count = redisTemplate.opsForHash().increment(key, userId.toString(), 1);
// 4.2.校验限领数量
if(count > coupon.getUserLimit()){
throw new BadRequestException("超出领取数量");
}
// 5.扣减优惠券库存
redisTemplate.opsForHash().increment(
PromotionConstants.COUPON_CACHE_KEY_PREFIX + couponId, "totalNum", -1);
// 6.发送MQ消息
UserCouponDTO uc = new UserCouponDTO();
uc.setUserId(userId);
uc.setCouponId(couponId);
mqHelper.send(MqConstants.Exchange.PROMOTION_EXCHANGE, MqConstants.Key.COUPON_RECEIVE, uc);
}
private Coupon queryCouponByCache(Long couponId) {
// 1.准备KEY
String key = PromotionConstants.COUPON_CACHE_KEY_PREFIX + couponId;
// 2.查询
Map<Object, Object> objMap = redisTemplate.opsForHash().entries(key);
if (objMap.isEmpty()) {
return null;
}
// 3.数据反序列化
return BeanUtils.mapToBean(objMap, Coupon.class, false, CopyOptions.create());
}
监听MQ并领券
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "coupon.receive.queue", durable = "true"),
exchange = @Exchange(name = PROMOTION_EXCHANGE, type = ExchangeTypes.TOPIC),
key = COUPON_RECEIVE
))
public void listenCouponReceiveMessage(UserCouponDTO uc){
userCouponService.checkAndCreateUserCoupon(uc);
}
// 移除了锁,这里不需要加锁了
@Transactional
@Override
public void checkAndCreateUserCoupon(UserCouponDTO uc) {
// 1.查询优惠券
Coupon coupon = couponMapper.selectById(uc.getCouponId());
if (coupon == null) {
throw new BizIllegalException("优惠券不存在!");
}
// 2.更新优惠券的已经发放的数量 + 1
int r = couponMapper.incrIssueNum(coupon.getId());
if (r == 0) {
throw new BizIllegalException("优惠券库存不足!");
}
// 3.新增一个用户券
saveUserCoupon(coupon, uc.getUserId());
// 4.更新兑换码状态
if (uc.getSerialNum()!= null) {
codeService.lambdaUpdate()
.set(ExchangeCode::getUserId, uc.getUserId())
.set(ExchangeCode::getStatus, ExchangeCodeStatus.USED)
.eq(ExchangeCode::getId, uc.getSerialNum())
.update();
}
}
异步的兑换码领券
思路分析
-
生成兑换码时,将优惠券及对应兑换码序列号的最大值缓存到Redis中
-
改造兑换优惠券的功能,利用Redis完成资格校验,然后发送MQ消息(消息体中要增加传递兑换码的序列号)
-
改造领取优惠券的MQ监听器,添加标记兑换码状态为已兑换的功能
缓存兑换码
生成兑换码时,将优惠券及对应兑换码序列号的最大值缓存到Redis中
@Override
@Async("generateExchangeCodeExecutor")
public void asyncGenerateCode(Coupon coupon) {
// 发放数量
Integer totalNum = coupon.getTotalNum();
// 1.获取Redis自增序列号
Long result = serialOps.increment(totalNum);
if (result == null) {
return;
}
int maxSerialNum = result.intValue();
List<ExchangeCode> list = new ArrayList<>(totalNum);
for (int serialNum = maxSerialNum - totalNum + 1; serialNum <= maxSerialNum; serialNum++) {
// 2.生成兑换码
String code = CodeUtil.generateCode(serialNum, coupon.getId());
ExchangeCode e = new ExchangeCode();
e.setCode(code);
e.setId(serialNum);
e.setExchangeTargetId(coupon.getId());
e.setExpiredTime(coupon.getIssueEndTime());
list.add(e);
}
// 3.保存数据库
saveBatch(list);
// 4.写入Redis缓存,member:couponId,score:兑换码的最大序列号
redisTemplate.opsForZSet().add(COUPON_RANGE_KEY, coupon.getId().toString(), maxSerialNum);
}
改造领券功能
改造兑换优惠券的功能,利用Redis完成资格校验,然后发送MQ消息(消息体中要增加传递兑换码的序列号)
@Override
@Lock(name = "lock:coupon:#{T(com.tianji.common.utils.UserContext).getUser()}")
public void exchangeCoupon(String code) {
// 1.校验并解析兑换码
long serialNum = CodeUtil.parseCode(code);
// 2.校验是否已经兑换 SETBIT KEY 4 1
boolean exchanged = codeService.updateExchangeMark(serialNum, true);
if (exchanged) {
throw new BizIllegalException("兑换码已经被兑换过了");
}
try {
// 3.查询兑换码对应的优惠券id
Long couponId = codeService.exchangeTargetId(serialNum);
if (couponId == null) {
throw new BizIllegalException("兑换码不存在!");
}
Coupon coupon = queryCouponByCache(couponId);
// 4.是否过期
LocalDateTime now = LocalDateTime.now();
if (now.isAfter(coupon.getIssueEndTime()) || now.isBefore(coupon.getIssueBeginTime())) {
throw new BizIllegalException("优惠券活动未开始或已经结束");
}
// 5.校验每人限领数量
Long userId = UserContext.getUser();
// 5.1.查询领取数量
String key = PromotionConstants.USER_COUPON_CACHE_KEY_PREFIX + couponId;
Long count = redisTemplate.opsForHash().increment(key, userId.toString(), 1);
// 5.2.校验限领数量
if(count > coupon.getUserLimit()){
throw new BadRequestException("超出领取数量");
}
// 6.发送MQ消息通知
UserCouponDTO uc = new UserCouponDTO();
uc.setUserId(userId);
uc.setCouponId(couponId);
uc.setSerialNum((int) serialNum);
mqHelper.send(MqConstants.Exchange.PROMOTION_EXCHANGE, MqConstants.Key.COUPON_RECEIVE, uc);
} catch (Exception e) {
// 重置兑换的标记 0
codeService.updateExchangeMark(serialNum, false);
throw e;
}
}
@Override
public Long exchangeTargetId(long serialNum) {
// 1.查询score值比当前序列号大的第一个优惠券
Set<String> results = redisTemplate.opsForZSet().rangeByScore(
COUPON_RANGE_KEY, serialNum, serialNum + 5000, 0L, 1L);
if (CollUtils.isEmpty(results)) {
return null;
}
// 2.数据转换
String next = results.iterator().next();
return Long.parseLong(next);
}
改造领取优惠券的MQ监听器,添加标记兑换码状态为已兑换的功能
// 移除了锁,这里不需要加锁了
@Transactional
@Override
public void checkAndCreateUserCoupon(UserCouponDTO uc) {
// 1.查询优惠券
Coupon coupon = couponMapper.selectById(uc.getCouponId());
if (coupon == null) {
throw new BizIllegalException("优惠券不存在!");
}
// 2.更新优惠券的已经发放的数量 + 1
int r = couponMapper.incrIssueNum(coupon.getId());
if (r == 0) {
throw new BizIllegalException("优惠券库存不足!");
}
// 3.新增一个用户券
saveUserCoupon(coupon, uc.getUserId());
// 4.更新兑换码状态
if (uc.getSerialNum()!= null) {
codeService.lambdaUpdate()
.set(ExchangeCode::getUserId, uc.getUserId())
.set(ExchangeCode::getStatus, ExchangeCodeStatus.USED)
.eq(ExchangeCode::getId, uc.getSerialNum())
.update();
}
}