Direct交换机是路由键精准匹配
Fanout交换机是不看路由键 ,只要你消息发给了某个交换机,这个交换机就立马把消息转给绑定了这个交换机的所有队列,所以速度最快
Topic交换机可以把一个消息根据交换机和消息队列的绑定的路由键进行匹配,把这个消息发送到所有符合路由键规则的消息队列上
特别注意:使用RabbitMQ发送消息时只需要给交换机指定的路由键就可以路由到对应的队列,前提是要先在配置里设置好交换机和队列的绑定关系(也就是发送消息时是指定路由键,不是指定队列名称)
使用RabbitMQ的准备工作
1、在虚拟机LInux上用docker安装RabbitMQ(前提是安装了虚拟机上已经docker容器)
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672-p 4369:4369-p 25672:25672-p 15671:15671-p 15672:15672 rabbitmq:management
2、导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3、application.properties配置文件中配置下面常用的RabbitMQ配置
spring.rabbitmq.host=192.168.241.128
spring.rabbitmq.port=5672
#虚拟主机
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
## 开启发送端消息抵达Broker确认(确认类型为交互correlated)
#也就是上面的confirmCallback模式,用于确定生产者发送的消息成功被broke服务器接收到了
spring.rabbitmq.publisher-confirm-type=correlated
## 开启发送端消息抵达Queue确认,下面两句配置一般都是一起的
#也就是上面的returnCallback模式,用于确认交换机把这个消息成功放到了消费队列中去
spring.rabbitmq.publisher-returns=true
## 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
## 使用手动ack消息,不使用默认的消费端确认---自动确认机制(使用默认的ack会出现丢失消息的情况)
#自动确认机制是只要被消费了就会删除队列中的这个消息,不管这个消息有没有真的被成功消费
#所以可能出现消息还没成功消费然后宕机了,所有消息都没了,就出现了消息丢失的情况
#手动ack就是成功消费了一条消息就删除一条消息,不会出现消息丢失的情况
spring.rabbitmq.listener.simple.acknowledge-mode=manual
这里特别注意:我是安装RabbitMQ的时候修改了默认的用户账号和密码为admin,所以修改了后每次使用RabbitMQ都要记得配置这个账号密码为admin
4、在主启动类上配置@EnableRabbit注解
5、 RabbitMQ配置类
package com.saodai.saodaimall.order.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ配置类
**/
@Configuration
public class MyRabbitConfig {
//设置rabbitMQ序列化为json
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
发送消息,如果发送的消息是个对象,我们会使用序列化机制,将对象写出去。对象必须实现serializable,同时消息内容也是系列化后的内容(一串很长的乱码),上面的配置内容就是让把对象内容转成json字符串,方便阅读,但是对象还是要序列化才可以通过rabbitMQ进行发送
6、创建消息队列、交换机、交换机和队列的绑定关系都是通过配置类来实现的
package com.saodai.saodaimall.ware.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* RabbitMQ配置类(一个交换机,两个队列,两个绑定,跟订单服务的基本一样,详细介绍看订单服务的队列)
*
*/
@Configuration
public class MyRabbitMQConfig {
/**
* 使用JSON序列化机制,进行消息转换
* @return
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* RabbitMQ要第一次连接上发现没有队列或者交换机才会创建,所以如果没有下面的代码运行会发现官网中并没有创建交换机和队列
* 下面代码就是模拟监听,这样就可以连接上RabbitMQ,然后可以创建交换机和队列
* 但是后面要注释掉(自动解锁库存时这里也会监听队列导致多一个消费者,所以要注释掉)
*/
// @RabbitListener(queues = "stock.release.stock.queue")
// public void handle(Message message) {
//
// }
/**
* 库存服务默认的交换机
* @return
*/
@Bean
public Exchange stockEventExchange() {
//String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
TopicExchange topicExchange = new TopicExchange("stock-event-exchange", true, false);
return topicExchange;
}
/**
* 普通队列
* @return
*/
@Bean
public Queue stockReleaseStockQueue() {
//String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
Queue queue = new Queue("stock.release.stock.queue", true, false, false);
return queue;
}
/**
* 延迟队列
* @return
*/
@Bean
public Queue stockDelay() {
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "stock-event-exchange");
arguments.put("x-dead-letter-routing-key", "stock.release");
// 消息过期时间 2分钟
arguments.put("x-message-ttl", 120000);
Queue queue = new Queue("stock.delay.queue", true, false, false,arguments);
return queue;
}
/**
* 交换机与普通队列绑定
* @return
*/
@Bean
public Binding stockLocked() {
//String destination, DestinationType destinationType, String exchange, String routingKey,
// Map<String, Object> arguments
Binding binding = new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.release.#",
null);
return binding;
}
/**
* 交换机与延迟队列绑定
* @return
*/
@Bean
public Binding stockLockedBinding() {
return new Binding("stock.delay.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.locked",
null);
}
}
本商城项目哪里用到了RabbitMQ
订单服务
1、在MyRabbitMQConfig配置类中创建队列、交换机、队列和交换机的绑定关系
订单服务和库存服务都是创建一个主题交换机,两个队列,一个队列是用来存放消息的,另外一个队列是用来存放死信的(也就是死了的消息,这里并没有直接处理掉,而是放到这个队列里)
整体思路
首先生产者发送一个消息给topic交换机order-event-exchange,交换机根据路由键order.create.order路由到延时队列order.delay.queue,然后消息在延时队列里等待指定的时间,当时间过期后还没有被消费就会被当成死信,然后把这个消息通过交换机order-event-exchange识别他的新的路由键order.release.order路由到新的队列order.release.order.queue
package com.saodai.saodaimall.order.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* RabbitMQ配置类
* 整体思路
* 首先生产者发送一个消息给topic交换机order-event-exchange,交换机根据路由键order.create.order路由到延时队列order.delay.queue
* 然后消息在延时队列里等待指定的时间,当时间过期后还没有被消费就会被当成死信,然后把这个消息通过交换机order-event-exchange识别他的新的
* 路由键order.release.order路由到新的队列order.release.order.queue
*
* 这里只有一个交换机,两个队列,一个队列是用来存放消息的,另外一个队列是用来存放死信的(也就是死了的消息,这里并没有直接处理掉,而是放到这个队列里)
**/
@Configuration
public class MyRabbitMQConfig {
/**
*
*创建延时队列
*延时队列是通过参数来设置的
* arguments.put("x-dead-letter-exchange", "order-event-exchange");前面的固定的前缀,表示这个队列延时后的消息
* @return
*/
@Bean
public Queue orderDelayQueue() {
//用map构造参数
HashMap<String, Object> arguments = new HashMap<>();
//指定延时后的消息的交换机(x-dead-letter-exchange是固定的前缀,order-event-exchange是自定义的交换机)
arguments.put("x-dead-letter-exchange", "order-event-exchange");
//死信路由(也就是消息如果超时了就会被当作死信,然后通过路由键order.release.order路由到指定的消息队列)
arguments.put("x-dead-letter-routing-key", "order.release.order");
//设置消息过期时间
arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
/*
Queue(String name, 队列名字
boolean durable, 是否持久化
boolean exclusive, 是否排他
boolean autoDelete, 是否自动删除
Map<String, Object> arguments) 参数
*/
Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
return queue;
}
/**
* 死信队列(也就是到了这个队列的都是要死的消息)
*
* @return
*/
@Bean
public Queue orderReleaseQueue() {
Queue queue = new Queue("order.release.order.queue", true, false, false);
return queue;
}
/**
* TopicExchange
*创建主题类型的交换机
* @return
*/
@Bean
public Exchange orderEventExchange() {
/*
* String name,
* boolean durable,
* boolean autoDelete,
* Map<String, Object> arguments
* */
return new TopicExchange("order-event-exchange", true, false);
}
/**
* 创建交换机和队列的捆绑关系(延时队列捆绑)
* @return
*/
@Bean
public Binding orderCreateBinding() {
/*
* String destination, 目的地(队列名或者交换机名字)
* DestinationType destinationType, 目的地类型(Queue、Exhcange)
* String exchange,
* String routingKey,
* Map<String, Object> arguments
* */
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
}
/**
* 创建交换机和队列的捆绑关系(死信队列捆绑)
* @return
*/
@Bean
public Binding orderReleaseBinding() {
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}
/**
* 订单释放直接和库存释放进行绑定
* @return
*/
@Bean
public Binding orderReleaseOtherBinding() {
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.other.#",
null);
}
/**
* 商品秒杀队列
* @return
*/
@Bean
public Queue orderSecKillOrrderQueue() {
Queue queue = new Queue("order.seckill.order.queue", true, false, false);
return queue;
}
@Bean
public Binding orderSecKillOrrderQueueBinding() {
//String destination, DestinationType destinationType, String exchange, String routingKey,
// Map<String, Object> arguments
Binding binding = new Binding(
"order.seckill.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.seckill.order",
null);
return binding;
}
}
/**
*
*创建延时队列
* @return
*/
@Bean
public Queue orderDelayQueue() {
//用map构造参数
HashMap<String, Object> arguments = new HashMap<>();
//指定延时后的消息的交换机(x-dead-letter-exchange是固定的前缀,order-event-exchange是自定义的交换机)
arguments.put("x-dead-letter-exchange", "order-event-exchange");
//死信路由(也就是消息如果超时了就会被当作死信,然后通过路由键order.release.order路由到指定的消息队列)
arguments.put("x-dead-letter-routing-key", "order.release.order");
//设置消息过期时间
arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
/*
Queue(String name, 队列名字
boolean durable, 是否持久化
boolean exclusive, 是否排他
boolean autoDelete, 是否自动删除
Map<String, Object> arguments) 参数
*/
Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
return queue;
}
创建特殊的延时队列只需要传入一个map类型的参数进去就可以让普通队列成为一个延时队列
//指定延时后的消息的交换机(x-dead-letter-exchange是固定的前缀,order-event-exchange是自定义的交换机)
arguments.put("x-dead-letter-exchange", "order-event-exchange");
//死信路由(也就是消息如果超时了就会被当作死信,然后通过路由键order.release.order路由到指定的消息队列)
arguments.put("x-dead-letter-routing-key", "order.release.order");
//设置消息过期时间
arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
x-dead-letter-exchange这个key是rabbitMQ封装好了的固定的前缀,表示这个队列延时后的消息指定使用order-event-exchange这个交换机,这个交换机来把延时的消息进行传输,x-dead-letter-routing-key也是rabbitMQ封装好了的固定的前缀,表示这个队列延时后的消息使用的新路由键是order.release.order,x-message-ttl也是rabbitMQ封装好了的固定的前缀,表示设置延时队列的延时时间是多少,也就是上面的三个key值都是封装好的固定前缀,后面的值才是自定义的
package com.saodai.saodaimall.ware.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* RabbitMQ配置类(一个交换机,两个队列,两个绑定,跟订单服务的基本一样,详细介绍看订单服务的队列)
*
*/
@Configuration
public class MyRabbitMQConfig {
/**
* 使用JSON序列化机制,进行消息转换
* @return
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* RabbitMQ要第一次连接上发现没有队列或者交换机才会创建,所以如果没有下面的代码运行会发现官网中并没有创建交换机和队列
* 下面代码就是模拟监听,这样就可以连接上RabbitMQ,然后可以创建交换机和队列
* 但是后面要注释掉(自动解锁库存时这里也会监听队列导致多一个消费者,所以要注释掉)
*/
// @RabbitListener(queues = "stock.release.stock.queue")
// public void handle(Message message) {
//
// }
/**
* 库存服务默认的交换机
* @return
*/
@Bean
public Exchange stockEventExchange() {
//String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
TopicExchange topicExchange = new TopicExchange("stock-event-exchange", true, false);
return topicExchange;
}
/**
* 普通队列
* @return
*/
@Bean
public Queue stockReleaseStockQueue() {
//String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
Queue queue = new Queue("stock.release.stock.queue", true, false, false);
return queue;
}
/**
* 延迟队列
* @return
*/
@Bean
public Queue stockDelay() {
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "stock-event-exchange");
arguments.put("x-dead-letter-routing-key", "stock.release");
// 消息过期时间 2分钟
arguments.put("x-message-ttl", 120000);
Queue queue = new Queue("stock.delay.queue", true, false, false,arguments);
return queue;
}
/**
* 交换机与普通队列绑定
* @return
*/
@Bean
public Binding stockLocked() {
//String destination, DestinationType destinationType, String exchange, String routingKey,
// Map<String, Object> arguments
Binding binding = new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.release.#",
null);
return binding;
}
/**
* 交换机与延迟队列绑定
* @return
*/
@Bean
public Binding stockLockedBinding() {
return new Binding("stock.delay.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.locked",
null);
}
}
2、理解订单服务和库存服务的RabbitMQ队列的图
(1)订单服务使用RabbitMQ的整个过程:
1、订单创建成功后发送消息给topic主题交换机order-event-exchange,交换机根据order.create.order路由键把消息路由到order.delay.queue延时队列(订单创建成功是指OrderServiceImpl类中的submitOrder方法执行成功后给order.delay.queue队列发送消息)
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order.getOrder());
2、消息在order.delay.queue延时队列里等待指定的时间,当时间过期后还没有被消费就会被当成死信,然后把这个消息通过order-event-exchange交换机的新的路由键order.release.order路由到order.release.order.queue队列(过期后的路由键和交换机设置是由订单服务的MyRabbitMQConfig配置的)
注意:这里order.delay.queue队列只是作为延时队列来使用的(正常情况是会有队列的监听器来监听这个队列的消息然后消费掉,但是在这个场景中是没有消费者来消费这个队列的消息的,因为这个队列只需要延时就可以了,并不需要消费者,这个队列的消息等待指定的时间后就会被送到order.release.order.queue队列里,从而达到延时队列的效果)
//交换机(x-dead-letter-exchange是固定的,order-event-exchange是自定义的交换机)
arguments.put("x-dead-letter-exchange", "order-event-exchange");
//死信路由(也就是消息如果超时了就会被当作死信,然后通过路由键order.release.order路由到指定的消息队列)
arguments.put("x-dead-letter-routing-key", "order.release.order");
//设置消息过期时间
arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
3、设置一个监听器用来消费order.release.order.queue队列的消息(这个队列的消息都是已经超时了的消息,也就是模拟用户生成订单后没有支付的订单,所以要写个监听器来取消之前生成的订单)
package com.saodai.saodaimall.order.listener;
import com.rabbitmq.client.Channel;
import com.saodai.saodaimall.order.entity.OrderEntity;
import com.saodai.saodaimall.order.service.OrderService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
* 订单监听器,监听的是队列order.release.order.queue(定时关闭订单)
* 但凡被这个监听器监听到的消息都是过期的死信
**/
@RabbitListener(queues = "order.release.order.queue")
@Service
public class OrderCloseListener {
@Autowired
private OrderService orderService;
@RabbitHandler
public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
System.out.println("收到过期的订单信息,准备关闭订单" + orderEntity.getOrderSn());
try {
//关闭订单
orderService.closeOrder(orderEntity);
//消费者的手动ack确认这条消息被成功消费了
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
/**
* 关闭订单(这个方法由OrderCloseListener监听器调用)
* 这个方法被调用说明这个订单已经过了指定的时间还没有付款
* 所谓的关闭订单其实就是修改订单的状态,修改成已取消就行了
* @param orderEntity 前面生成订单时发送给RabbitMQ队列的消息orderEntity
*/
@Override
public void closeOrder(OrderEntity orderEntity) {
//关闭订单之前先查询一下数据库,判断此订单状态是否已支付
OrderEntity orderInfo = this.getOne(new QueryWrapper<OrderEntity>().
eq("order_sn",orderEntity.getOrderSn()));
// CREATE_NEW(0,"待付款")(说明这个订单已经过了指定的时间还没有付款)
if (orderInfo.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) {
//如果是待付款状态就可以进行关单
OrderEntity orderUpdate = new OrderEntity();
orderUpdate.setId(orderInfo.getId());
//把待付款修改成已取消的状态即可CANCLED(4,"已取消")
orderUpdate.setStatus(OrderStatusEnum.CANCLED.getCode());
this.updateById(orderUpdate);
/**
这里要考虑一个情况(这个特殊情况是需要下面的额外处理)
* 防止订单服务卡顿,导致订单状态消息一直改不了,也就是上面的代码因为卡顿导致没有执行
解库存服务先执行,查订单状态发现不是取消状态,然后什么都不处理
* 导致卡顿的订单,永远都不能解锁库存
* 所以订单释放直接和库存释放进行绑定
*/
// 发送消息给MQ
OrderTo orderTo = new OrderTo();
BeanUtils.copyProperties(orderInfo, orderTo);
try {
//订单释放直接和库存释放进行绑定
/**
* 订单取消后立马发消息给交换机,交换机把这个消息通过路由键order.release.other发到队列stock.release.stock.queue
* 这个路由设置是由MyRabbitMQConfig中的orderReleaseOtherBinding方法进行绑定的
*/
rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);
} catch (Exception e) {
//TODO 定期扫描数据库,重新发送失败的消息
}
}
}
- 关闭订单之前先查询一下数据库,判断此订单状态是否已支付
- 关闭订单其实就是修改订单的状态,修改成已取消
- 这里要考虑一个情况(这个特殊情况是需要额外的处理的)
-
- 按理来说是订单服务的取消订单操作是在解库存操作的前面的,也就是一般先会取消订单操作后再去解库存操作,但是如果取消订单操作因为网络卡顿导致解库存操作先执行的话就会出现下面的情况:
-
-
- 解库存的实现逻辑又是先来看看订单的状态是不是已取消,如果是已取消才会去解库存,否则就不会执行解库存操作了,上面的情况就会出现解库存操作来看订单状态的时候发现订单状态是待支付,不是已取消状态, 所以就不执行解库存操作,由于解库存操作只会来查看一次,所以就会导致卡顿的订单,永远都不能解锁库存
- 解决办法:订单取消后立马发消息给order-event-exchange交换机,交换机把这个消息通过路由键order.release.other发到stock.release.stock.queue队列,这个队列其中有个监听方法就是来监听这个消息的,只要监听到这个消息就会立马执行解库存
-
rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);
package com.saodai.common.to;
import lombok.Data;
import java.math.BigDecimal;
import java.util.Date;
/**
*订单类
*/
@Data
public class OrderTo {
private Long id;
/**
* member_id
*/
private Long memberId;
/**
* 订单号
*/
private String orderSn;
/**
* 使用的优惠券
*/
private Long couponId;
/**
* create_time
*/
private Date createTime;
/**
* 用户名
*/
private String memberUsername;
/**
* 订单总额
*/
private BigDecimal totalAmount;
/**
* 应付总额
*/
private BigDecimal payAmount;
/**
* 运费金额
*/
private BigDecimal freightAmount;
/**
* 促销优化金额(促销价、满减、阶梯价)
*/
private BigDecimal promotionAmount;
/**
* 积分抵扣金额
*/
private BigDecimal integrationAmount;
/**
* 优惠券抵扣金额
*/
private BigDecimal couponAmount;
/**
* 后台调整订单使用的折扣金额
*/
private BigDecimal discountAmount;
/**
* 支付方式【1->支付宝;2->微信;3->银联; 4->货到付款;】
*/
private Integer payType;
/**
* 订单来源[0->PC订单;1->app订单]
*/
private Integer sourceType;
/**
* 订单状态【0->待付款;1->待发货;2->已发货;3->已完成;4->已关闭;5->无效订单】
*/
private Integer status;
/**
* 物流公司(配送方式)
*/
private String deliveryCompany;
/**
* 物流单号
*/
private String deliverySn;
/**
* 自动确认时间(天)
*/
private Integer autoConfirmDay;
/**
* 可以获得的积分
*/
private Integer integration;
/**
* 可以获得的成长值
*/
private Integer growth;
/**
* 发票类型[0->不开发票;1->电子发票;2->纸质发票]
*/
private Integer billType;
/**
* 发票抬头
*/
private String billHeader;
/**
* 发票内容
*/
private String billContent;
/**
* 收票人电话
*/
private String billReceiverPhone;
/**
* 收票人邮箱
*/
private String billReceiverEmail;
/**
* 收货人姓名
*/
private String receiverName;
/**
* 收货人电话
*/
private String receiverPhone;
/**
* 收货人邮编
*/
private String receiverPostCode;
/**
* 省份/直辖市
*/
private String receiverProvince;
/**
* 城市
*/
private String receiverCity;
/**
* 区
*/
private String receiverRegion;
/**
* 详细地址
*/
private String receiverDetailAddress;
/**
* 订单备注
*/
private String note;
/**
* 确认收货状态[0->未确认;1->已确认]
*/
private Integer confirmStatus;
/**
* 删除状态【0->未删除;1->已删除】
*/
private Integer deleteStatus;
/**
* 下单时使用的积分
*/
private Integer useIntegration;
/**
* 支付时间
*/
private Date paymentTime;
/**
* 发货时间
*/
private Date deliveryTime;
/**
* 确认收货时间
*/
private Date receiveTime;
/**
* 评价时间
*/
private Date commentTime;
/**
* 修改时间
*/
private Date modifyTime;
}
(2)库存服务使用RabbitMQ的整个过程
锁库存成功后就会发消息给stock-event-exchange交换机,交换机根据路由键stock.locked把消息路由到stock.delay.queue延时队列(跟上面一样,这个延时队列的消息不会被消费掉),时间过期后就把消息根据路由键stock.release路由到stock.release.stock.queue队列,然后这个消息队列的消息是被一个专门解库存的监听器来监听(注意这里有两种解库存的监听方法,一个是自动解库存的监听,一个是订单服务的订单取消后立马解库存的监听)
rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",lockedTo);
package com.saodai.saodaimall.ware.listener;
import com.rabbitmq.client.Channel;
import com.saodai.common.to.OrderTo;
import com.saodai.common.to.mq.StockLockedTo;
import com.saodai.saodaimall.ware.service.WareSkuService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
* RabbitMQ的监听器
* 这里有两个监听方法,这两个监听识别的依据是看传入的是StockLockedTo还是OrderTo
* 一个是监听的库存自动解锁
* 一个是监听订单取消后库存解锁
*/
@Slf4j
@RabbitListener(queues = "stock.release.stock.queue")
@Service
public class StockReleaseListener {
@Autowired
private WareSkuService wareSkuService;
/**
** 监听库存自动解锁
*/
@RabbitHandler
public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {
log.info("******收到解锁库存的信息******");
try {
System.out.println("******收到解锁库存的信息******");
//当前消息是否被第二次及以后(重新)派发过来了
// Boolean redelivered = message.getMessageProperties().getRedelivered();
//解锁库存
wareSkuService.unlockStock(to);
// 手动删除消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
// 解锁失败 将消息重新放回队列,让别人消费
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
/**
*
* 防止订单服务卡顿,导致订单状态消息一直改不了,库存优先到期,查订单状态新建,什么都不处理
*导致卡顿的订单,永远都不能解锁库存
* 订单释放直接和库存释放进行绑定
* @param orderTo
* @param message
* @param channel
* @throws IOException
*/
@RabbitHandler
public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {
log.info("******收到订单关闭,准备解锁库存的信息******");
try {
wareSkuService.unlockStock(orderTo);
// 手动删除消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
// 解锁失败 将消息重新放回队列,让别人消费
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
package com.saodai.common.to.mq;
import lombok.Data;
/**
* 发送到mq消息队列的to
**/
@Data
public class StockLockedTo {
/** 库存工作单的id **/
private Long id;
/** 工作单详情的所有信息 StockDetailTo对象内容就是上面的WareOrderTaskDetailEntity **/
private StockDetailTo detailTo;
}
解锁库存的思路
首先查询数据库的库存详细工作单表看看有没有成功锁定库存(如果成功锁库存了会有对应的一条记录),如果没有那就说明库存没有锁成功,那自然就不需要解锁了
- 库存详细工作单表有这条记录那就证明库存锁定成功了
-
- 具体需不需要解库存还要先看订单状态
-
-
- 先查询有没有这个订单,没有这个订单必须解锁库存(可能出现因为有异常造成的数据回滚导致订单不存在的情况,但是库存锁成功了)
- 有这个订单,不一定解锁库存,要根据订单的状态来决定是否解库存
-
-
-
-
- 订单状态是已取消状态,说明是用户没有支付订单过期了,那就必须解锁库存
- 订单状态是已支付状态,说明是用户支付成功了,那就不能解锁库存
-
-
-
-
- 除了判断上面的情况,还有考虑当前库存详细工作单的状态,只有满足订单状态是已取消状态并且是已锁定的状态那才可以解库存
-
-
-
-
- 已锁定:解锁库存
- 已解锁 :不能再解锁
-
-
/**
* (这个方法是由StockReleaseListener监听器调用的)
* 锁库存失败后的自动解锁(也就是回溯)
* @param to
*/
@Override
public void unlockStock(StockLockedTo to) {
//获取库存详细工作单类
StockDetailTo detail = to.getDetailTo();
//库存详细工作单的id
Long detailId = detail.getId();
//WareOrderTaskDetailEntity是库存详细工作单类
WareOrderTaskDetailEntity taskDetailInfo = wareOrderTaskDetailService.getById(detailId);
if (taskDetailInfo != null) {
//查出wms_ware_order_task工作单的信息
Long id = to.getId();
//订单锁库存工作单(获取哪个订单要锁库存)
WareOrderTaskEntity orderTaskInfo = wareOrderTaskService.getById(id);
//获取订单号查询订单状态
String orderSn = orderTaskInfo.getOrderSn();
//远程查询订单信息
R orderData = orderFeignService.getOrderStatus(orderSn);
if (orderData.getCode() == 0) {
//订单数据返回成功
OrderVo orderInfo = orderData.getData("data", new TypeReference<OrderVo>() {});
/**
* CREATE_NEW(0,"待付款"),
* PAYED(1,"已付款"),
* SENDED(2,"已发货"),
* RECIEVED(3,"已完成"),
* CANCLED(4,"已取消"),
* SERVICING(5,"售后中"),
* SERVICED(6,"售后完成");
*/
//订单不存在(因为有异常造成的数据回滚导致订单不存在)或者订单状态是取消状态(orderInfo.getStatus() == 4)才可解库存
if (orderInfo == null || orderInfo.getStatus() == 4) {
//当前库存工作单详情状态1,已锁定,只有当前库存工作单详情状态未解锁才可以解锁
if (taskDetailInfo.getLockStatus() == 1) {
//调用真正接库存的方法unLockStock
unLockStock(detail.getSkuId(),detail.getWareId(),detail.getSkuNum(),detailId);
}
}
} else {
//消息拒绝以后重新放在队列里面,让别人继续消费解锁
//远程调用服务失败
throw new RuntimeException("远程调用服务失败");
}
} else {
//无需解锁
}
}
/**
* 真正解锁库存的方法(自动解库存)
* @param skuId 需要解锁库存的商品id
* @param wareId 需要解锁库存的库存仓库id
* @param num 需要解锁库存的商品数量
* @param taskDetailId 库存工作单详情id
*/
public void unLockStock(Long skuId,Long wareId,Integer num,Long taskDetailId) {
//库存解锁(其实就是修改wms_ware_sku表中的stock_locked的值,之前锁库存锁了多少个就减去多少个)
wareSkuDao.unLockStock(skuId,wareId,num);
//更新工作单的状态
WareOrderTaskDetailEntity taskDetailEntity = new WareOrderTaskDetailEntity();
taskDetailEntity.setId(taskDetailId);
//setLockStatus(2)表示变为已解锁(1表示已锁定,2表示已解锁,3表示减扣)
taskDetailEntity.setLockStatus(2);
wareOrderTaskDetailService.updateById(taskDetailEntity);
}
/**
* 订单取消了就立马解库存
* 防止订单服务卡顿,导致订单状态消息一直改不了,库存优先到期,查订单状态新建,什么都不处理
* 导致卡顿的订单,永远都不能解锁库存
* @param orderTo
*/
@Transactional(rollbackFor = Exception.class)
@Override
public void unlockStock(OrderTo orderTo) {
String orderSn = orderTo.getOrderSn();
//查一下最新的库存解锁状态,防止重复解锁库存
WareOrderTaskEntity orderTaskEntity = wareOrderTaskService.getOrderTaskByOrderSn(orderSn);
//按照工作单的id找到所有 没有解锁的库存,进行解锁(lock_status=1表示已锁定库存)
Long id = orderTaskEntity.getId();
List<WareOrderTaskDetailEntity> list = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>()
.eq("task_id", id).eq("lock_status", 1));
for (WareOrderTaskDetailEntity taskDetailEntity : list) {
//解锁库存
unLockStock(taskDetailEntity.getSkuId(),
taskDetailEntity.getWareId(),
taskDetailEntity.getSkuNum(),
taskDetailEntity.getId());
}
}
自动解库存
/**
* (这个方法是由StockReleaseListener监听器调用的)
* 锁库存失败后的自动解锁(也就是回溯)
* @param to
*/
@Override
public void unlockStock(StockLockedTo to) {
//获取库存详细工作单类
StockDetailTo detail = to.getDetailTo();
//库存详细工作单的id
Long detailId = detail.getId();
//WareOrderTaskDetailEntity是库存详细工作单类
WareOrderTaskDetailEntity taskDetailInfo = wareOrderTaskDetailService.getById(detailId);
if (taskDetailInfo != null) {
//查出wms_ware_order_task工作单的信息
Long id = to.getId();
//订单锁库存工作单(获取哪个订单要锁库存)
WareOrderTaskEntity orderTaskInfo = wareOrderTaskService.getById(id);
//获取订单号查询订单状态
String orderSn = orderTaskInfo.getOrderSn();
//远程查询订单信息
R orderData = orderFeignService.getOrderStatus(orderSn);
if (orderData.getCode() == 0) {
//订单数据返回成功
OrderVo orderInfo = orderData.getData("data", new TypeReference<OrderVo>() {});
/**
* CREATE_NEW(0,"待付款"),
* PAYED(1,"已付款"),
* SENDED(2,"已发货"),
* RECIEVED(3,"已完成"),
* CANCLED(4,"已取消"),
* SERVICING(5,"售后中"),
* SERVICED(6,"售后完成");
*/
//订单不存在(因为有异常造成的数据回滚导致订单不存在)或者订单状态是取消状态(orderInfo.getStatus() == 4)才可解库存
if (orderInfo == null || orderInfo.getStatus() == 4) {
//当前库存工作单详情状态1,已锁定,只有当前库存工作单详情状态未解锁才可以解锁
if (taskDetailInfo.getLockStatus() == 1) {
//调用真正接库存的方法unLockStock
unLockStock(detail.getSkuId(),detail.getWareId(),detail.getSkuNum(),detailId);
}
}
} else {
//消息拒绝以后重新放在队列里面,让别人继续消费解锁
//远程调用服务失败
throw new RuntimeException("远程调用服务失败");
}
} else {
//无需解锁
}
}
/**
* 真正解锁库存的方法(自动解库存)
* @param skuId 需要解锁库存的商品id
* @param wareId 需要解锁库存的库存仓库id
* @param num 需要解锁库存的商品数量
* @param taskDetailId 库存工作单详情id
*/
public void unLockStock(Long skuId,Long wareId,Integer num,Long taskDetailId) {
//库存解锁(其实就是修改wms_ware_sku表中的stock_locked的值,之前锁库存锁了多少个就减去多少个)
wareSkuDao.unLockStock(skuId,wareId,num);
//更新工作单的状态
WareOrderTaskDetailEntity taskDetailEntity = new WareOrderTaskDetailEntity();
taskDetailEntity.setId(taskDetailId);
//setLockStatus(2)表示变为已解锁(1表示已锁定,2表示已解锁,3表示减扣)
taskDetailEntity.setLockStatus(2);
wareOrderTaskDetailService.updateById(taskDetailEntity);
}
- 自动解库存的具体实现流程
-
- 获取库存详细工作单的id
package com.saodai.common.to.mq;
import lombok.Data;
/**
* 发送到mq消息队列的to
**/
@Data
public class StockLockedTo {
/** 库存工作单的id **/
private Long id;
/** 工作单详情的所有信息 **/
private StockDetailTo detailTo;
}
package com.saodai.common.to.mq;
import lombok.Data;
/**
* 其实就是库存工作单详情实体类(具体给订单的哪个商品锁库存)
**/
@Data
public class StockDetailTo {
private Long id;
/**
* sku_id
*/
private Long skuId;
/**
* sku_name
*/
private String skuName;
/**
* 购买个数
*/
private Integer skuNum;
/**
* 工作单id
*/
private Long taskId;
/**
* 仓库id
*/
private Long wareId;
/**
* 锁定状态
*/
private Integer lockStatus;
}
-
- 查询数据库有没有这个库存详细工作单类
package com.saodai.saodaimall.ware.entity;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 库存工作单详情(具体给订单的哪个商品锁库存)
*/
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Data
@TableName("wms_ware_order_task_detail")
public class WareOrderTaskDetailEntity implements Serializable {
private static final long serialVersionUID = 1L;
/**
* id
*/
@TableId
private Long id;
/**
* sku_id
*/
private Long skuId;
/**
* sku_name
*/
private String skuName;
/**
* 购买个数
*/
private Integer skuNum;
/**
* 工作单id
*/
private Long taskId;
/**
* 仓库id
*/
private Long wareId;
/**
* 锁定状态
*/
private Integer lockStatus;
}
-
- 查询订单锁库存工作单(获取哪个订单要锁库存)
package com.saodai.saodaimall.ware.entity;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* 订单锁库存工作单(表示我准备要给哪个订单锁库存了)
*/
@Data
@TableName("wms_ware_order_task")
public class WareOrderTaskEntity implements Serializable {
private static final long serialVersionUID = 1L;
/**
* id
*/
@TableId
private Long id;
/**
* order_id
*/
private Long orderId;
/**
* order_sn
*/
private String orderSn;
/**
* 收货人
*/
private String consignee;
/**
* 收货人电话
*/
private String consigneeTel;
/**
* 配送地址
*/
private String deliveryAddress;
/**
* 订单备注
*/
private String orderComment;
/**
* 付款方式【 1:在线付款 2:货到付款】
*/
private Integer paymentWay;
/**
* 任务状态
*/
private Integer taskStatus;
/**
* 订单描述
*/
private String orderBody;
/**
* 物流单号
*/
private String trackingNo;
/**
* create_time
*/
private Date createTime;
/**
* 仓库id
*/
private Long wareId;
/**
* 工作单备注
*/
private String taskComment;
}
-
- 根据订单号远程查询订单
package com.saodai.saodaimall.ware.vo;
import lombok.Data;
import java.math.BigDecimal;
import java.util.Date;
@Data
public class OrderVo {
private Long id;
/**
* member_id
*/
private Long memberId;
/**
* 订单号
*/
private String orderSn;
/**
* 使用的优惠券
*/
private Long couponId;
/**
* create_time
*/
private Date createTime;
/**
* 用户名
*/
private String memberUsername;
/**
* 订单总额
*/
private BigDecimal totalAmount;
/**
* 应付总额
*/
private BigDecimal payAmount;
/**
* 运费金额
*/
private BigDecimal freightAmount;
/**
* 促销优化金额(促销价、满减、阶梯价)
*/
private BigDecimal promotionAmount;
/**
* 积分抵扣金额
*/
private BigDecimal integrationAmount;
/**
* 优惠券抵扣金额
*/
private BigDecimal couponAmount;
/**
* 后台调整订单使用的折扣金额
*/
private BigDecimal discountAmount;
/**
* 支付方式【1->支付宝;2->微信;3->银联; 4->货到付款;】
*/
private Integer payType;
/**
* 订单来源[0->PC订单;1->app订单]
*/
private Integer sourceType;
/**
* 订单状态【0->待付款;1->待发货;2->已发货;3->已完成;4->已关闭;5->无效订单】
*/
private Integer status;
/**
* 物流公司(配送方式)
*/
private String deliveryCompany;
/**
* 物流单号
*/
private String deliverySn;
/**
* 自动确认时间(天)
*/
private Integer autoConfirmDay;
/**
* 可以获得的积分
*/
private Integer integration;
/**
* 可以获得的成长值
*/
private Integer growth;
/**
* 发票类型[0->不开发票;1->电子发票;2->纸质发票]
*/
private Integer billType;
/**
* 发票抬头
*/
private String billHeader;
/**
* 发票内容
*/
private String billContent;
/**
* 收票人电话
*/
private String billReceiverPhone;
/**
* 收票人邮箱
*/
private String billReceiverEmail;
/**
* 收货人姓名
*/
private String receiverName;
/**
* 收货人电话
*/
private String receiverPhone;
/**
* 收货人邮编
*/
private String receiverPostCode;
/**
* 省份/直辖市
*/
private String receiverProvince;
/**
* 城市
*/
private String receiverCity;
/**
* 区
*/
private String receiverRegion;
/**
* 详细地址
*/
private String receiverDetailAddress;
/**
* 订单备注
*/
private String note;
/**
* 确认收货状态[0->未确认;1->已确认]
*/
private Integer confirmStatus;
/**
* 删除状态【0->未删除;1->已删除】
*/
private Integer deleteStatus;
/**
* 下单时使用的积分
*/
private Integer useIntegration;
/**
* 支付时间
*/
private Date paymentTime;
/**
* 发货时间
*/
private Date deliveryTime;
/**
* 确认收货时间
*/
private Date receiveTime;
/**
* 评价时间
*/
private Date commentTime;
/**
* 修改时间
*/
private Date modifyTime;
}
-
- 进行双重判断
-
-
- 先判断订单不存在(因为有异常造成的数据回滚导致订单不存在)或者订单状态是取消状态
- 在判断当前库存工作单详情状态是不是1,1表示已锁定,只有当前库存工作单详情状态未解锁才可以解锁
-
-
- 调用unLockStock方法实现真正的解库存(自动解库存)
-
-
- 更新库存的数量(还原)
- 更新工作单的状态为已解锁
-
/**
* 真正解锁库存的方法(自动解库存)
* @param skuId 需要解锁库存的商品id
* @param wareId 需要解锁库存的库存仓库id
* @param num 需要解锁库存的商品数量
* @param taskDetailId 库存工作单详情id
*/
public void unLockStock(Long skuId,Long wareId,Integer num,Long taskDetailId) {
//库存解锁(其实就是修改wms_ware_sku表中的stock_locked的值,之前锁库存锁了多少个就减去多少个)
wareSkuDao.unLockStock(skuId,wareId,num);
//更新工作单的状态
WareOrderTaskDetailEntity taskDetailEntity = new WareOrderTaskDetailEntity();
taskDetailEntity.setId(taskDetailId);
//setLockStatus(2)表示变为已解锁(1表示已锁定,2表示已解锁,3表示减扣)
taskDetailEntity.setLockStatus(2);
wareOrderTaskDetailService.updateById(taskDetailEntity);
}
<!-- 解锁库存-->
<update id="unLockStock">
UPDATE wms_ware_sku
SET stock_locked = stock_locked - #{num}
WHERE
sku_id = ${skuId}
AND ware_id = #{wareId}
</update>
手动解库存
- 订单服务的订单取消后立马解库存的具体逻辑
-
- 首先通过订单号查询订单锁库存工作单
- 通过订单锁库存工作单的id去库存详细工作单去找对应的锁库存的记录,看有没有记录并且锁库存的状态是已锁定的状态,防止多次重复解库存(其中库存详细工作单中的工作id的值就是订单锁库存工作单的id的值)
- 最后调用真正的解库存方法来解库存
/**
* 真正解锁库存的方法(自动解库存)
* @param skuId 需要解锁库存的商品id
* @param wareId 需要解锁库存的库存仓库id
* @param num 需要解锁库存的商品数量
* @param taskDetailId 库存工作单详情id
*/
public void unLockStock(Long skuId,Long wareId,Integer num,Long taskDetailId) {
//库存解锁(其实就是修改wms_ware_sku表中的stock_locked的值,之前锁库存锁了多少个就减去多少个)
wareSkuDao.unLockStock(skuId,wareId,num);
//更新工作单的状态
WareOrderTaskDetailEntity taskDetailEntity = new WareOrderTaskDetailEntity();
taskDetailEntity.setId(taskDetailId);
//setLockStatus(2)表示变为已解锁(1表示已锁定,2表示已解锁,3表示减扣)
taskDetailEntity.setLockStatus(2);
wareOrderTaskDetailService.updateById(taskDetailEntity);
}
/**
* 订单取消了就立马解库存
* 防止订单服务卡顿,导致订单状态消息一直改不了,库存优先到期,查订单状态新建,什么都不处理
* 导致卡顿的订单,永远都不能解锁库存
* @param orderTo
*/
@Transactional(rollbackFor = Exception.class)
@Override
public void unlockStock(OrderTo orderTo) {
String orderSn = orderTo.getOrderSn();
//查一下最新的库存解锁状态,防止重复解锁库存
WareOrderTaskEntity orderTaskEntity = wareOrderTaskService.getOrderTaskByOrderSn(orderSn);
//按照工作单的id找到所有 没有解锁的库存,进行解锁(lock_status=1表示已锁定库存)
Long id = orderTaskEntity.getId();
List<WareOrderTaskDetailEntity> list = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>()
.eq("task_id", id).eq("lock_status", 1));
for (WareOrderTaskDetailEntity taskDetailEntity : list) {
//解锁库存
unLockStock(taskDetailEntity.getSkuId(),
taskDetailEntity.getWareId(),
taskDetailEntity.getSkuNum(),
taskDetailEntity.getId());
}
}