目录
一、概述
二、步骤
三、说明
四、详细步骤
五、总结
一、概述
在订单服务中使用到了消息队列 具体就是解决关单还有自动解锁库存的功能
其实就是使用消息队列的延迟队列的功能 达到一个定时任务的作用
使用消息队列到达最终一致性的效果
比如说库存 当下单之后 执行锁库存的远程方法
如果说下订单的那个方法,在锁库存之后出现了异常,那么这个下订单的方法会回滚,但是这个锁库存的远程方法,已经锁定,就不能够再回滚了。这样就会造成订单没有下成,但是库存却锁定了
二、步骤
一旦开始下单,锁库存之后,就往这个库存的延迟队列里面发送消息,在库存服务中,消费这个消息,来选择是否需要解锁库存
三、说明
其实这一大堆的代码,可以使用分布式事务的一个注解就能解决,因为这就是一个服务与服务之间的那种独立性,造成只能回滚订单,不能回滚库存的这样一种局面,而使用这种分布式事务,就能将多个服务视为一个事务中,也就能够一处出问题,处处回滚。
但是分布式事务相当于整个完全就是一个大事务,相当于串行化了,效率就低了。但是它的好处能够保证即时一致性
而现在这种下单的业务,我们对高并发,效率这方面有要求,而其实并不需要强一致性,虽然当初数据不合理,就让它不合理,只要最终弥补回来就行了
其实就是自动解锁库存,利用了消息队列的这种消息发布与订阅达到最终一致性的效果
四、详细步骤
整个流程图:
1. 搭建消息队列环境
1.1 将消息队列的依赖导入:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.2 启动类上面启动消息队列服务
加上@EnableRabbit注解
1.3 配置消息转换器使得发消息还有收消息能以实体类的方式接收
@Configuration
public class MyRabbitConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
创建队列、交换机、绑定关系
创建队列:延迟队列、死信队列
注意:这个延迟队列的超时时间不能使用字符串,得使用整形数字
订单相关:
@Configuration
public class RabbitMQBeanConfig {
@Bean
public Queue orderDelayQueue(){
//String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments
HashMap<String, Object> map = new HashMap<>();
map.put("x-dead-letter-exchange","order-event-exchange");
map.put("x-dead-letter-routing-key","order.release.order");
map.put("x-message-ttl",60000);
return new Queue("order-delay-queue",true,false,false,map);
}
@Bean
public Queue orderReleaseStockQueue(){
return new Queue("order.release.order.queue",true,false,false,null);
}
@Bean
public Exchange orderEventExchange(){
return new TopicExchange("order-event-exchange",true,false);
}
@Bean
public Binding createOrder(){
//String destination, DestinationType destinationType,
// String exchange, String routingKey, @Nullable Map<String, Object> arguments
return new Binding("order-delay-queue", Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);
}
@Bean
public Binding releaseOrder(){
//String destination, DestinationType destinationType,
// String exchange, String routingKey, @Nullable Map<String, Object> arguments
return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);
}
@Bean
public Binding OrderReleaseOther(){
return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE,"order-event-exchange","order.release.other.#",null);
}
}
库存相关:
@Configuration
public class RabbitMQBeanConfig {
@Bean
public Queue stockDelayQueue(){
//String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments
HashMap<String, Object> map = new HashMap<>();
map.put("x-dead-letter-exchange","stock-event-exchange");
map.put("x-dead-letter-routing-key","stock.release.stock");
map.put("x-message-ttl",120000);
return new Queue("stock-delay-queue",true,false,false,map);
}
@Bean
public Queue stockReleaseStockQueue(){
return new Queue("stock.release.stock.queue",true,false,false,null);
}
@Bean
public Exchange stockEventExchange(){
return new TopicExchange("stock-event-exchange",true,false);
}
@Bean
public Binding stockLocked(){
//String destination, DestinationType destinationType,
// String exchange, String routingKey, @Nullable Map<String, Object> arguments
return new Binding("stock-delay-queue", Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);
}
@Bean
public Binding stockRelease(){
//String destination, DestinationType destinationType,
// String exchange, String routingKey, @Nullable Map<String, Object> arguments
return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);
}
}
2. 发消息
自动解锁库存:
完整锁库存:
@Override
@Transactional
public Boolean lockWareStock(WareStockIdsDto wareStockIdsDto) {
//创建订单工作单
WareOrderTask wareOrderTask = new WareOrderTask();
wareOrderTask.setOrderSn(wareStockIdsDto.getOrderSn());
wareOrderTaskService.save(wareOrderTask);
List<OrderItem> orderItems = wareStockIdsDto.getOrderItems();
orderItems.stream().forEach(item->{
WareStockIds wareStockIds = new WareStockIds();
wareStockIds.setSkuId(item.getSkuId());
//查询商品的所有库存信息
List<Long> wareIds = this.getBaseMapper().wareStockIds(item.getSkuId());
wareStockIds.setWareIds(wareIds);
//锁库存
if(wareIds!=null && wareIds.size()>0){
boolean flag = false;
for (Long wareId : wareIds) {
int row = this.getBaseMapper().lockWareStock(item.getSkuId(), wareId, item.getSkuQuantity());
if(row==1){
flag = true;
WareOrderTaskDetail wareOrderTaskDetail = new WareOrderTaskDetail();
wareOrderTaskDetail.setTaskId(wareOrderTask.getId());
wareOrderTaskDetail.setSkuId(item.getSkuId());
wareOrderTaskDetail.setWareId(wareId);
wareOrderTaskDetail.setSkuNum(item.getSkuQuantity());
wareOrderTaskDetail.setLockStatus(StockStatus.LOCK);
wareOrderTaskDetailService.save(wareOrderTaskDetail);
WareStockTaskTo wareStockTaskTo = new WareStockTaskTo();
wareStockTaskTo.setTaskId(wareOrderTask.getId());
wareStockTaskTo.setTaskDetailId(wareOrderTaskDetail.getId());
rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",wareStockTaskTo);
break;
}
}
if(!flag){
throw new WareStockEmptyException(item.getSkuId()+"的商品库存不足");
}
}else{
//库存不足,则直接抛出异常,整个锁定库存失败
throw new WareStockEmptyException(item.getSkuId()+"的商品库存不足");
}
});
return true;
}
发消息:发送到库存延迟队列
一旦锁库存成功,就需要发送消息,检测这个库存锁定是否合理;
rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",wareStockTaskTo);
发送之前需要保存工作单、工作单详情也就是相当于记录日志的方式,方便之后消息消费的时候来解锁库存:
3. 消费消息
注意:使用消息消费监听器进行消费,不要忘记一旦消费失败就要重新将消息放入到消息队列中
当消费成功之后,要ack消息 手动提交 只有这样才能防止消息丢失,因为如果是默认自动提交,那么如果消息消费的时候出现了异常,消息并没有合理消费,此时是需要重试的,但是已经自动ack了,就没有消息了。
@RabbitListener(queues = "stock.release.stock.queue")
@Component
@Slf4j
public class StockReleaseStockQueueListen {
@Autowired
private IWareSkuService wareSkuService;
@RabbitHandler
public void handle(WareStockTaskTo wareStockTaskTo, Message message, Channel channel) throws IOException {
try{
log.info("handle开始解锁库存...{}",wareStockTaskTo);
wareSkuService.unLock(wareStockTaskTo);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
@Override
public void unLock(WareStockTaskTo wareStockTaskTo) {
Long taskDetailId = wareStockTaskTo.getTaskDetailId();
WareOrderTaskDetail orderTaskDetail = wareOrderTaskDetailService.getById(taskDetailId);
//如果工作单详情为空,则代表库存服务本身自己就回滚了,无需解锁
if(orderTaskDetail!=null){
Long taskId = wareStockTaskTo.getTaskId();
WareOrderTask wareOrderTask = wareOrderTaskService.getById(taskId);
//通过工作单进而得到订单
String orderSn = wareOrderTask.getOrderSn();
OrderTo order = orderFeignService.getOrder(orderSn);
//当订单不存在或者是订单已被取消则需要解锁库存
if(order==null || OrderStatusEnum.CANCLED.equals(order.getStatus().intValue())){
//只有是上锁状态才能解库存
//这样其实就保证了幂等性
if(StockStatus.LOCK.equals(orderTaskDetail.getLockStatus())){
unlock(orderTaskDetail);
}
}
}
}
private void unlock(WareOrderTaskDetail orderTaskDetail) {
getBaseMapper().unLock(orderTaskDetail.getSkuId(), orderTaskDetail.getWareId(), orderTaskDetail.getSkuNum());
WareOrderTaskDetail wareOrderTaskDetail = new WareOrderTaskDetail();
wareOrderTaskDetail.setId(orderTaskDetail.getId());
//将工作单详情状态修改为已解锁状态
wareOrderTaskDetail.setLockStatus(StockStatus.UN_LOCK);
wareOrderTaskDetailService.updateById(wareOrderTaskDetail);
}
自动收单:
提交订单:
@Transactional
//@GlobalTransactional
@Override
public OrderRespTo submitOrder(SubmitOrderDto submitOrderDto) {
OrderRespTo orderRespTo = createOrder(submitOrderDto);
if(orderRespTo.getCode().equals(0)){
//保存订单
saveOrder(orderRespTo);
//发送消息 释放订单
Order order = orderRespTo.getOrder();
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order);
//锁定库存
WareStockIdsVo wareStockIdsVo = new WareStockIdsVo();
wareStockIdsVo.setOrderSn(orderRespTo.getOrder().getOrderSn());
wareStockIdsVo.setOrderItems(orderRespTo.getOrderItems());
//调用远程库存服务锁库存
AjaxResult result = wareFeignService.lockWareStok(wareStockIdsVo);
//int i = 10/0;
if(result.get("code").equals(200)){
return orderRespTo;
}else{
orderRespTo.setCode(3);
}
}
return orderRespTo;
}
发消息:只要保存了订单就需要往订单的延迟队列中发消息
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order);
消费消息:
消息消费监听器
@Component
@RabbitListener(queues = "order.release.order.queue")
public class OrderReleaseOrderQueueListener {
@Autowired
private IOrderService orderService;
@RabbitHandler
public void handleReleaseOrder(Order order, Message message, Channel channel) throws IOException {
try{
orderService.releaseOrder(order);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
@Override
public void releaseOrder(Order order) {
Order currentOrder = getById(order.getId());
//如果此时订单还是一个新建状态 未支付状态 就需要释放订单
if (OrderStatusEnum.CREATE_NEW.getCode().equals(currentOrder.getStatus().intValue())) {
Order order1 = new Order();
order1.setId(currentOrder.getId());
order1.setStatus(new Long(OrderStatusEnum.CANCLED.getCode().intValue()));
updateById(order1);
OrderTo orderTo = BeanCopyUtils.copyBean(order, OrderTo.class);
//向库存服务发送消息
rabbitTemplate.convertAndSend("stock-event-exchange","stock.release.other",orderTo);
}
}
看到代码中,存在这一步:
//向库存服务发送消息
rabbitTemplate.convertAndSend("stock-event-exchange","stock.release.other",orderTo);
为什么还要向库存服务发送消息,来解锁库存呢?
是为了防止如果在关单的时候阻塞了,订单的状态还没来得及修改,而此时需要消费库存消息了,此时判断订单状态并不是已取消状态,则无法解锁库存,造成库存永远无法解锁
正常情况下消费库存在关单之后就不会出现这种问题,但是发生了阻塞就没办法了。
所以就得在关单之后同时还要发消息,通知库存服务来判断订单的真实状态,如果是取消状态,则需要解锁库存
而且现在不再向延迟队列里面发送消息,而是直接向死信队列中发送消息,来消息这个消息,因此死信队列现在又多了一个消费者,两个消费者同时订阅这个消息,通过消息实体类的不同区分不同的消费逻辑。
@RabbitHandler
public void handle1(OrderTo orderTo,Message message, Channel channel) throws IOException {
try{
log.info("handle1开始解锁库存...{}",orderTo);
wareSkuService.unLock(orderTo);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
@Override
public void unLock(com.sq.gulimall.to.mq.OrderTo orderTo) {
String orderSn = orderTo.getOrderSn();
Long status = orderTo.getStatus();
//当订单是取消状态时解锁库存
if(OrderStatusEnum.CANCLED.getCode().equals(status.intValue())){
LambdaQueryWrapper<WareOrderTask> wrapper = new LambdaQueryWrapper<WareOrderTask>().eq(WareOrderTask::getOrderSn, orderSn);
WareOrderTask orderTask = wareOrderTaskService.getOne(wrapper);
LambdaQueryWrapper<WareOrderTaskDetail> wrapper1 = new LambdaQueryWrapper<WareOrderTaskDetail>().eq(WareOrderTaskDetail::getTaskId, orderTask.getId());
List<WareOrderTaskDetail> list = wareOrderTaskDetailService.list(wrapper1);
for (WareOrderTaskDetail wareOrderTaskDetail : list) {
//同样也是只有当库存是已锁定状态时才需要去解锁
if(StockStatus.LOCK.equals(wareOrderTaskDetail.getLockStatus())){
unlock(wareOrderTaskDetail);
}
}
}
}
private void unlock(WareOrderTaskDetail orderTaskDetail) {
getBaseMapper().unLock(orderTaskDetail.getSkuId(), orderTaskDetail.getWareId(), orderTaskDetail.getSkuNum());
WareOrderTaskDetail wareOrderTaskDetail = new WareOrderTaskDetail();
wareOrderTaskDetail.setId(orderTaskDetail.getId());
//将工作单详情状态修改为已解锁状态
wareOrderTaskDetail.setLockStatus(StockStatus.UN_LOCK);
wareOrderTaskDetailService.updateById(wareOrderTaskDetail);
}
五、总结
使用消息队列解决多服务下面的数据不一致的问题,通过最终一致性解决