跟接上文:
并发情况下的库存扣减
一 库存扣减
public DefaultTreeFactory.TreeActionEntity logic(String userId, Long strategyId, Integer awardId, String ruleValue, Date endDateTime) {
log.info("规则过滤-库存扣减 userId:{} strategyId:{} awardId:{}", userId, strategyId, awardId);
// 扣减库存
Boolean status = strategyDispatch.subtractionAwardStock(strategyId, awardId, endDateTime);
// true;库存扣减成功,TAKE_OVER 规则节点接管,返回奖品ID,奖品规则配置
if (status) {
log.info("规则过滤-库存扣减-成功 userId:{} strategyId:{} awardId:{}", userId, strategyId, awardId);
// 写入延迟队列,延迟消费更新数据库记录。【在trigger的job;UpdateAwardStockJob 下消费队列,更新数据库记录】
strategyRepository.awardStockConsumeSendQueue(StrategyAwardStockKeyVO.builder()
.strategyId(strategyId)
.awardId(awardId)
.build());
// 注意;根据数据库表中配置走不同的节点。目前数据库配置 ALLOW 是走到下一个节点。
return DefaultTreeFactory.TreeActionEntity.builder()
.ruleLogicCheckType(RuleLogicCheckTypeVO.TAKE_OVER)
.strategyAwardVO(DefaultTreeFactory.StrategyAwardVO.builder()
.awardId(awardId)
.awardRuleValue(ruleValue)
.build())
.build();
}
// 如果库存不足,则直接返回放行
log.warn("规则过滤-库存扣减-告警,库存不足。userId:{} strategyId:{} awardId:{}", userId, strategyId, awardId);
return DefaultTreeFactory.TreeActionEntity.builder()
.ruleLogicCheckType(RuleLogicCheckTypeVO.ALLOW)
.build();
}
public void awardStockConsumeSendQueue(StrategyAwardStockKeyVO strategyAwardStockKeyVO) {
String cacheKey = Constants.RedisKey.STRATEGY_AWARD_COUNT_QUERY_KEY;
RBlockingQueue<StrategyAwardStockKeyVO> blockingQueue = redisService.getBlockingQueue(cacheKey);
RDelayedQueue<StrategyAwardStockKeyVO> delayedQueue = redisService.getDelayedQueue(blockingQueue);
delayedQueue.offer(strategyAwardStockKeyVO, 3, TimeUnit.SECONDS);
}
二 定时任务扫描扣减数据库库存
@Slf4j
@Component()
public class UpdateAwardStockJob {
@Resource
private IRaffleStock raffleStock;
// @Scheduled(cron = "0/5 * * * * ?")
public void exec() {
try {
StrategyAwardStockKeyVO strategyAwardStockKeyVO = raffleStock.takeQueueValue();
if (null == strategyAwardStockKeyVO) return;
log.info("定时任务,更新奖品消耗库存 strategyId:{} awardId:{}", strategyAwardStockKeyVO.getStrategyId(), strategyAwardStockKeyVO.getAwardId());
raffleStock.updateStrategyAwardStock(strategyAwardStockKeyVO.getStrategyId(), strategyAwardStockKeyVO.getAwardId());
} catch (Exception e) {
log.error("定时任务,更新奖品消耗库存失败", e);
}
}
}
public StrategyAwardStockKeyVO takeQueueValue() throws InterruptedException {
String cacheKey = Constants.RedisKey.STRATEGY_AWARD_COUNT_QUERY_KEY;
RBlockingQueue<StrategyAwardStockKeyVO> destinationQueue = redisService.getBlockingQueue(cacheKey);
return destinationQueue.poll();
}
<update id="updateStrategyAwardStock" parameterType="cn.bugstack.infrastructure.persistent.po.StrategyAward">
update strategy_award
set award_count_surplus = award_count_surplus - 1
where strategy_id = #{strategyId} and award_id = #{awardId} and award_count_surplus > 0
</update>
三 redis库存扣减到0,直接清空队列并修改数据库库存为0
@Autowired
private RabbitTemplate rabbitTemplate;
public void publish(String topic, BaseEvent.EventMessage<?> eventMessage) {
try {
String messageJson = JSON.toJSONString(eventMessage);
rabbitTemplate.convertAndSend(topic, messageJson);
log.info("发送MQ消息 topic:{} message:{}", topic, messageJson);
} catch (Exception e) {
log.error("发送MQ消息失败 topic:{} message:{}", topic, JSON.toJSONString(eventMessage), e);
throw e;
}
}
@Slf4j
@Component
public class ActivitySkuStockZeroCustomer {
@Value("${spring.rabbitmq.topic.activity_sku_stock_zero}")
private String topic;
@Resource
private IRaffleActivitySkuStockService skuStock;
@RabbitListener(queuesToDeclare = @Queue(value = "${spring.rabbitmq.topic.activity_sku_stock_zero}"))
public void listener(String message) {
try {
log.info("监听活动sku库存消耗为0消息 topic: {} message: {}", topic, message);
// 转换对象
BaseEvent.EventMessage<Long> eventMessage = JSON.parseObject(message, new TypeReference<BaseEvent.EventMessage<Long>>() {
}.getType());
Long sku = eventMessage.getData();
// 更新库存
skuStock.clearActivitySkuStock(sku);
// 清空队列 「此时就不需要延迟更新数据库记录了」todo 清空时,需要设定sku标识,不能全部清空。
skuStock.clearQueueValue();
} catch (Exception e) {
log.error("监听活动sku库存消耗为0消息,消费失败 topic: {} message: {}", topic, message);
throw e;
}
}
}
<update id="clearActivitySkuStock" parameterType="java.lang.Long">
update raffle_activity_sku
set stock_count_surplus = 0, update_time = now()
where sku = #{sku}
</update>
@Override
public void clearQueueValue() {
String cacheKey = Constants.RedisKey.ACTIVITY_SKU_COUNT_QUERY_KEY;
RBlockingQueue<ActivitySkuStockKeyVO> destinationQueue = redisService.getBlockingQueue(cacheKey);
destinationQueue.clear();
}