一、库存解锁的场景
RabbitMQ库存解锁的场景有很多,以下是一些常见的场景:
-
订单取消和订单回滚。下订单成功,订单过期没有支付被系统自动取消、被用户手动取消。都要解锁库存。
-
下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚;之前锁定的库存就要自动解锁。
订单系统需要通知库存系统,如果想要解锁库存,可以通过stock.locked路由键发送一个消息给交换机stock-event-exchange,消息内容包括哪个订单、哪些商品、多少库存
二、流程:
如图:
- 锁定库存成功,发送消息(库存工作单详情)给stock-event-exchange交换机,路由键stock.locked
- stock-event-exchange交换机将消息交给stock.delay.queue队列,路由键stock.locked,存活时间50分钟
- 50分钟后,stock.delay.queue队列队列的消息交给死信交换机stock-event-exchange,路由键stock.release
- 死信交换机stock-event-exchange将消息叫给stock.release.stock.queue死信队列,路由键stock.release
- 使用监听器监听stock.release.stock.queue死信队列,如果发生支付异常或者业务调用失败,导致订单回滚,进行库存解锁
三、RabbitMQ代码实现:
1. 主机、端口、属性配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
# 虚拟主机
virtual-host: /kmall
# 开启发送端发送确认,无论是否到达broker都会触发回调【发送端确认机制+本地事务表】
publisher-confirm-type: correlated
# 开启发送端抵达队列确认,消息未被队列接收时触发回调【发送端确认机制+本地事务表】
publisher-returns: true
# 消息在没有被队列接收时是否强行退回
template:
mandatory: true
# 消费者手动确认模式,关闭自动确认,否则会消息丢失
listener:
simple:
acknowledge-mode: manual
2. 开启rabbitMQ
// 开启rabbit
@EnableRabbit
// 开启feign
@EnableFeignClients(basePackages = "com.koo.modules.ware.feign")
// 开启服务注册功能
@EnableDiscoveryClient
@SpringBootApplication
public class WareApplication {
public static void main(String[] args) {
SpringApplication.run(WareApplication.class, args);
}
}
3. 配置rabbitMQ模板与消息格式
@Configuration
public class MyRabbitConfig {
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
// TODO 封装RabbitTemplate
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate(rabbitTemplate);
return rabbitTemplate;
}
@Bean
public MessageConverter messageConverter() {
// 使用json序列化器来序列化消息,发送消息时,消息对象会被序列化成json格式
return new Jackson2JsonMessageConverter();
}
/**
* 定制RabbitTemplate
* 1、服务收到消息就会回调
* 1、spring.rabbitmq.publisher-confirms: true
* 2、设置确认回调
* 2、消息正确抵达队列就会进行回调
* 1、spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2、设置确认回调ReturnCallback
* <p>
* 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
*/
//@PostConstruct // (MyRabbitConfig对象创建完成以后,执行这个方法)
public void initRabbitTemplate(RabbitTemplate rabbitTemplate) {
/**
* 发送消息触发confirmCallback回调
* @param correlationData:当前消息的唯一关联数据(如果发送消息时未指定此值,则回调时返回null)
* @param ack:消息是否成功收到(ack=true,消息抵达Broker)
* @param cause:失败的原因
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("发送消息触发confirmCallback回调" +
"\ncorrelationData ===> " + correlationData +
"\nack ===> " + ack + "" +
"\ncause ===> " + cause);
System.out.println("=================================================");
});
/**
* 消息未到达队列触发returnCallback回调
* 只要消息没有投递给指定的队列,就触发这个失败回调
* @param message:投递失败的消息详细信息
* @param replyCode:回复的状态码
* @param replyText:回复的文本内容
* @param exchange:接收消息的交换机
* @param routingKey:接收消息的路由键
*/
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 需要修改数据库 消息的状态【后期定期重发消息】
System.out.println("消息未到达队列触发returnCallback回调" +
"\nmessage ===> " + message +
"\nreplyCode ===> " + replyCode +
"\nreplyText ===> " + replyText +
"\nexchange ===> " + exchange +
"\nroutingKey ===> " + routingKey);
System.out.println("==================================================");
});
}
}
4. 初始化交换机与队列,绑定交换机与队列
@Configuration
public class MyRabbitMQConfig {
/**
* 交换机
* Topic,可以绑定多个队列
*/
@Bean
public Exchange stockEventExchange() {
//String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
return new TopicExchange("stock-event-exchange", true, false);
}
/**
* 死信队列
* 释放库存
*/
@Bean
public Queue stockReleaseStockQueue() {
//String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
return new Queue("stock.release.stock.queue", true, false, false);
}
/**
* 延时队列
* 锁定库存
*/
@Bean
public Queue stockDelay() {
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "stock-event-exchange");
arguments.put("x-dead-letter-routing-key", "stock.release");
// 消息过期时间 1.5分钟
arguments.put("x-message-ttl", 90000);
return new Queue("stock.delay.queue", true, false, false, arguments);
}
/**
* 绑定:交换机与死信队列
* 释放库存
*/
@Bean
public Binding stockLocked() {
//String destination, DestinationType destinationType, String exchange, String routingKey,
// Map<String, Object> arguments
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.release.#",
null);
}
/**
* 绑定:交换机与延时队列
* 锁定库存
*/
@Bean
public Binding stockLockedBinding() {
return new Binding("stock.delay.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.locked",
null);
}
}
四、业务实现:
1. 库存表与实体类
CREATE TABLE `kmall-ware`.`wms_ware_sku` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`sku_id` bigint(20) NULL DEFAULT NULL COMMENT 'sku_id',
`ware_id` bigint(20) NULL DEFAULT NULL COMMENT '仓库id',
`stock` int(11) NULL DEFAULT NULL COMMENT '库存数',
`sku_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT 'sku_name',
`stock_locked` int(11) NULL DEFAULT 0 COMMENT '锁定库存',
PRIMARY KEY (`id`) USING BTREE,
INDEX `sku_id`(`sku_id`) USING BTREE,
INDEX `ware_id`(`ware_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 12 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '商品库存' ROW_FORMAT = Dynamic;
@Data
@TableName("wms_ware_sku")
public class WareSkuEntity implements Serializable {
private static final long serialVersionUID = 1L;
/**
* id
*/
@TableId
private Long id;
/**
* sku_id
*/
private Long skuId;
/**
* 仓库id
*/
private Long wareId;
/**
* 库存数
*/
private Integer stock;
/**
* sku_name
*/
private String skuName;
/**
* 锁定库存
*/
private Integer stockLocked;
}
2. 库存工作单与明细
CREATE TABLE `kmall-ware`.`wms_ware_order_task` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`order_id` bigint(20) NULL DEFAULT NULL COMMENT 'order_id',
`order_sn` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT 'order_sn',
`consignee` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '收货人',
`consignee_tel` char(15) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '收货人电话',
`delivery_address` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '配送地址',
`order_comment` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '订单备注',
`payment_way` tinyint(1) NULL DEFAULT NULL COMMENT '付款方式【 1:在线付款 2:货到付款】',
`task_status` tinyint(2) NULL DEFAULT NULL COMMENT '任务状态',
`order_body` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '订单描述',
`tracking_no` char(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '物流单号',
`create_time` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT 'create_time',
`ware_id` bigint(20) NULL DEFAULT NULL COMMENT '仓库id',
`task_comment` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '工作单备注',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 35 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '库存工作单' ROW_FORMAT = Dynamic;
CREATE TABLE `kmall-ware`.`wms_ware_order_task_detail` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`sku_id` bigint(20) NULL DEFAULT NULL COMMENT 'sku_id',
`sku_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT 'sku_name',
`sku_num` int(11) NULL DEFAULT NULL COMMENT '购买个数',
`task_id` bigint(20) NULL DEFAULT NULL COMMENT '工作单id',
`ware_id` bigint(20) NULL DEFAULT NULL COMMENT '仓库id',
`lock_status` int(1) NULL DEFAULT NULL COMMENT '1-已锁定 2-已解锁 3-扣减',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 50 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '库存工作单' ROW_FORMAT = Dynamic;
实体类:
@Data
@TableName("wms_ware_order_task")
public class WareOrderTaskEntity implements Serializable {
private static final long serialVersionUID = 1L;
/**
* id
*/
@TableId
private Long id;
/**
* order_id
*/
private Long orderId;
/**
* order_sn
*/
private String orderSn;
/**
* 收货人
*/
private String consignee;
/**
* 收货人电话
*/
private String consigneeTel;
/**
* 配送地址
*/
private String deliveryAddress;
/**
* 订单备注
*/
private String orderComment;
/**
* 付款方式【 1:在线付款 2:货到付款】
*/
private Integer paymentWay;
/**
* 任务状态
*/
private Integer taskStatus;
/**
* 订单描述
*/
private String orderBody;
/**
* 物流单号
*/
private String trackingNo;
/**
* create_time
*/
private Date createTime;
/**
* 仓库id
*/
private Long wareId;
/**
* 工作单备注
*/
private String taskComment;
}
@NoArgsConstructor
@AllArgsConstructor
@Data
@TableName("wms_ware_order_task_detail")
public class WareOrderTaskDetailEntity implements Serializable {
private static final long serialVersionUID = 1L;
/**
* id
*/
@TableId
private Long id;
/**
* sku_id
*/
private Long skuId;
/**
* sku_name
*/
private String skuName;
/**
* 购买个数
*/
private Integer skuNum;
/**
* 工作单id
*/
private Long taskId;
/**
* 仓库id
*/
private Long wareId;
/**
* 1-已锁定 2-已解锁 3-扣减
*/
private Integer lockStatus;
}
3. 订单与明细包装类
@Data
public class OrderItemVO {
private Long skuId;
private Boolean check;
private String title;
private String image;
/**
* 商品套餐属性
*/
private List<String> skuAttrValues;
private BigDecimal price;
private Integer count;
private BigDecimal totalPrice;
/** 商品重量 **/
private BigDecimal weight = new BigDecimal("0.085");
}
@Data
public class WareSkuLockTO {
private String orderSn;
/** 需要锁住的所有库存信息 **/
private List<OrderItemVO> locks;
}
4. 锁定库存
4.1 控制层
@PostMapping(value = "/lock/order")
public R orderLockStock(@RequestBody WareSkuLockTO lockTO) {
try {
wareSkuService.orderLockStock(lockTO);
return R.ok();
} catch (NoStockException e) {
return R.error(NO_STOCK_EXCEPTION.getCode(), NO_STOCK_EXCEPTION.getMsg());
}
}
4.2 实现层
库存锁定,sql执行锁定锁定
- 往库存工作单存储当前锁定(本地事务表)
- 封装待锁定库存项Map
- 查询(库存 - 库存锁定 >= 待锁定库存数)的仓库
- 判断是否查询到仓库
- 将查询出的仓库数据封装成Map,key:skuId val:wareId
- 判断是否为每一个商品项至少匹配了一个仓库
- 所有商品都存在有库存的仓库 锁定库存
- 锁定成功,跳出循环
- 创建库存锁定工作单消息(每一件商品一条消息)
- 往库存工作单详情存储当前锁定(本地事务表)
- 发送库存锁定成功消息
@Transactional
@Override
public Boolean orderLockStock(WareSkuLockTO lockTO) {
// 按照收货地址找到就近仓库,锁定库存(暂未实现)
// 采用方案:获取每项商品在哪些仓库有库存,轮询尝试锁定,任一商品锁定失败回滚
// 1.往库存工作单存储当前锁定(本地事务表)
WareOrderTaskEntity taskEntity = new WareOrderTaskEntity();
taskEntity.setOrderSn(lockTO.getOrderSn());
orderTaskService.save(taskEntity);
// 2.封装待锁定库存项Map
Map<Long, OrderItemVO> lockItemMap = lockTO.getLocks().stream().collect(Collectors.toMap(key -> key.getSkuId(), val -> val));
// 3.查询(库存 - 库存锁定 >= 待锁定库存数)的仓库
List<WareSkuEntity> wareSkuEntities = baseMapper.selectListHasSkuStock(lockItemMap.keySet());
List<WareSkuEntity> wareList = wareSkuEntities.stream().filter(entity -> (entity.getStock() - entity.getStockLocked()) >= lockItemMap.get(entity.getSkuId()).getCount()).collect(Collectors.toList());
// 4.判断是否查询到仓库
if (CollectionUtils.isEmpty(wareList)) {
// 匹配失败,所有商品项没有库存
Set<Long> skuIds = lockItemMap.keySet();
throw new NoStockException(skuIds);
}
// 5.将查询出的仓库数据封装成Map,key:skuId val:wareId
Map<Long, List<WareSkuEntity>> wareMap = wareList.stream().collect(Collectors.groupingBy(key -> key.getSkuId()));
// 6.判断是否为每一个商品项至少匹配了一个仓库
List<WareOrderTaskDetailEntity> taskDetails = new ArrayList<>();// 库存锁定工作单详情
Map<Long, StockLockedTO> lockedMessageMap = new HashMap<>();// 库存锁定工作单消息
if (wareMap.size() < lockTO.getLocks().size()) {
// 匹配失败,部分商品没有库存
Set<Long> skuIds = lockItemMap.keySet();
skuIds.removeAll(wareMap.keySet());// 求商品项差集
throw new NoStockException(skuIds);
} else {
// 所有商品都存在有库存的仓库
// 7.锁定库存
for (Map.Entry<Long, List<WareSkuEntity>> entry : wareMap.entrySet()) {
Boolean skuStocked = false;
Long skuId = entry.getKey();// 商品
OrderItemVO item = lockItemMap.get(skuId);
Integer count = item.getCount();// 待锁定个数
List<WareSkuEntity> hasStockWares = entry.getValue();// 有足够库存的仓库
for (WareSkuEntity ware : hasStockWares) {
Long num = baseMapper.lockSkuStock(skuId, ware.getWareId(), count);
if (num == 1) {
// 8. 锁定成功,跳出循环
skuStocked = true;
// 创建库存锁定工作单详情(每一件商品锁定详情)
WareOrderTaskDetailEntity taskDetail = new WareOrderTaskDetailEntity(null, skuId,
item.getTitle(), count, taskEntity.getId(), ware.getWareId(),
WareOrderTaskConstant.LockStatusEnum.LOCKED.getCode());
taskDetails.add(taskDetail);
//9。 创建库存锁定工作单消息(每一件商品一条消息)
StockDetailTO detailMessage = new StockDetailTO();
BeanUtils.copyProperties(taskDetail, detailMessage);
StockLockedTO lockedMessage = new StockLockedTO(taskEntity.getId(), detailMessage);
lockedMessageMap.put(skuId, lockedMessage);
break;
}
}
if (!skuStocked) {
// 匹配失败,当前商品所有仓库都未锁定成功
throw new NoStockException(skuId);
}
}
}
// 10.往库存工作单详情存储当前锁定(本地事务表)
orderTaskDetailService.saveBatch(taskDetails);
// 11.发送消息
for (WareOrderTaskDetailEntity taskDetail : taskDetails) {
StockLockedTO message = lockedMessageMap.get(taskDetail.getSkuId());
message.getDetail().setId(taskDetail.getId());// 存储库存详情ID
rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", message);
}
return true;
}
5、监听死信队列stock.release.stock.queue
/**
* 解锁库存,监听死信队列
*
* @author: charlin
**/
@Slf4j
@RabbitListener(queues = "stock.release.stock.queue")
@Component
public class StockReleaseListener {
@Autowired
private WareSkuService wareSkuService;
/**
* 库存解锁(监听死信队列)
* 场景:
* 1.下订单成功【需要解锁】(订单过期未支付、被用户手动取消、其他业务调用失败(订单回滚))
* 2.下订单失败【无需解锁】(库存锁定失败(库存锁定已回滚,但消息已发出))
* <p>
* 注意:需要开启手动确认,不要删除消息,当前解锁失败需要重复解锁
*/
@RabbitHandler
public void handleStockLockedRelease(StockLockedTO locked, Message message, Channel channel) throws IOException {
log.debug("库存解锁,库存工作单详情ID:" + locked.getDetail().getId());
//当前消息是否重新派发过来
// Boolean redelivered = message.getMessageProperties().getRedelivered();
try {
// 解锁库存
wareSkuService.unLockStock(locked);
// 解锁成功,手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 解锁失败,消息入队
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
/**
* 客户取消订单,监听到消息
*/
@RabbitHandler
public void handleOrderCloseRelease(OrderTO orderTo, Message message, Channel channel) throws IOException {
log.debug("订单关闭准备解锁库存,订单号:" + orderTo.getOrderSn());
try {
wareSkuService.unLockStock(orderTo);
// 手动删除消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 解锁失败 将消息重新放回队列,让别人消费
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
6. 库存解锁
/**
* 库存解锁
*/
@Override
public void unLockStock(StockLockedTO locked) throws Exception {
StockDetailTO taskDetailTO = locked.getDetail();// 库存工作单详情TO
WareOrderTaskDetailEntity taskDetail = orderTaskDetailService.getById(taskDetailTO.getId());// 库存工作单详情Entity
if (taskDetail != null) {
// 1.工作单未回滚,需要解锁
WareOrderTaskEntity task = orderTaskService.getById(locked.getId());// 库存工作单Entity
R r = orderFeignService.getOrderByOrderSn(task.getOrderSn());// 订单Entity
if (r.getCode() == 0) {
// 订单数据返回成功
OrderTO order = r.getData(new TypeReference<OrderTO>() {
});
if (order == null || OrderConstant.OrderStatusEnum.CANCLED.getCode().equals(order.getStatus())) {
// 2.订单已回滚 || 订单未回滚已取消状态
if (WareOrderTaskConstant.LockStatusEnum.LOCKED.getCode().equals(taskDetail.getLockStatus())) {
// 订单已锁定状态,需要解锁(消息确认)
unLockStock(taskDetailTO.getSkuId(), taskDetailTO.getWareId(), taskDetailTO.getSkuNum(), taskDetailTO.getId());
} else {
// 订单其他状态,不可解锁(消息确认)
}
}
} else {
// 订单远程调用失败(消息重新入队)
throw new Exception();
}
} else {
// 3.无库存锁定工作单记录,已回滚,无需解锁(消息确认)
}
}
/**
* 库存解锁
* 订单解锁触发,防止库存解锁消息优先于订单解锁消息到期,导致库存无法解锁
*/
@Transactional
@Override
public void unLockStock(OrderTO order) {
String orderSn = order.getOrderSn();// 订单号
// 1.根据订单号查询库存锁定工作单
WareOrderTaskEntity task = orderTaskService.getOrderTaskByOrderSn(orderSn);
// 2.按照工作单查询未解锁的库存,进行解锁
List<WareOrderTaskDetailEntity> taskDetails = orderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>()
.eq("task_id", task.getId())
.eq("lock_status", WareOrderTaskConstant.LockStatusEnum.LOCKED.getCode()));// 并发问题
// 3.解锁库存
for (WareOrderTaskDetailEntity taskDetail : taskDetails) {
unLockStock(taskDetail.getSkuId(), taskDetail.getWareId(), taskDetail.getSkuNum(), taskDetail.getId());
}
}
/**
* 库存解锁
* 1.sql执行释放锁定
* 2.更新库存工作单状态为已解锁
*
* @param skuId
* @param wareId
* @param count
*/
public void unLockStock(Long skuId, Long wareId, Integer count, Long taskDetailId) {
// 1.库存解锁
baseMapper.unLockStock(skuId, wareId, count);
// 2.更新工作单的状态 已解锁
WareOrderTaskDetailEntity taskDetail = new WareOrderTaskDetailEntity();
taskDetail.setId(taskDetailId);
taskDetail.setLockStatus(WareOrderTaskConstant.LockStatusEnum.UNLOCKED.getCode());
orderTaskDetailService.updateById(taskDetail);
}