一、基本配置
- 导入依赖
<!--高级消息队列协议amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- application.yml配置
#rabbitmq
rabbitmq:
host: 192.168.56.10
virtual-host: /
port: 5672
- 启动类添加注解
@EnableRabbit
- 配置mq的json序列化
@Configuration
public class RabbitmqConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter RabbitmqConvertJSON(){
return new Jackson2JsonMessageConverter();
}
}
- 配置交换机、队列、绑定规则
@Bean
public Exchange StockEventExchange(){
//TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
return new TopicExchange("stock-event-exchange",true,false);
}
@Bean
public Queue StockReleaseStockQueue(){
//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)
return new Queue("stock.release.stock.queue",true,false,false);
}
@Bean
public Queue StockDelayQueue(){
Map<String,Object> map = new HashMap<>();
//死信路由exchange
map.put("x-dead-letter-exchange","stock-event-exchange");
//死信routing-key
map.put("x-dead-letter-routing-key","stock.release");
//time to live
map.put("x-message-ttl",60000);
return new Queue("stock.delay.queue",true,false,false,map);
}
@Bean
public Binding StockLocked(){
//Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)
return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);
}
@Bean
public Binding StockRelease(){
return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);
}
- 建立连接,即可在mq中生成交换机、队列、绑定规则
@RabbitListener(queues = {"stock.release.stock.queue"})
public void ListenQueue(Message message, Channel channel){
}
二、订单服务使用mq
1.生成队列、交换机、绑定规则
/**
* 如果设置错误需要删掉错误的Queue重启服务即可,重启服务不会覆盖原有的Queue
*/
@Configuration
public class MyMQConfig {
@Bean
public Queue OrderDelayQueue(){
Map<String,Object> map = new HashMap<>();
//死信路由
map.put("x-dead-letter-exchange","order-event-exchange");
//死信
map.put("x-dead-letter-routing-key","order.release.order");
//time to live
map.put("x-message-ttl",30000);
//持久化,排它
//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
return new Queue("order.delay.order",true,false,false,map);
}
@Bean
public Queue OrderReleaseOrderQueue(){
return new Queue("order.release.order.queue",true,false,false,null);
}
//选用topic类型交换机是因为需要binding多个队列
@Bean
public Exchange OrderEventExchange(){
//TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
return new TopicExchange("order-event-exchange",true,false,null);
}
@Bean
public Binding OrderCreateOrder(){
//Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)
return new Binding("order.delay.order", Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);
}
@Bean
public Binding OrderReleaseOrder(){
return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);
}
}
2.生产者发送消息
@ResponseBody
@GetMapping("/test/queue")
public String testQueue(){
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
//给队列发消息,指定routing key
//convertAndSend(String exchange, String routingKey, Object object)
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);
return "给mq发消息完成";
}
3.订阅队列
@RabbitListener(queues = {"order.release.order.queue"})
public void ListenQueue(Channel channel, Message message, OrderEntity orderEntity) throws IOException {
//因为配置了手动ack,所有这里需要签收消息
//basicAck(long deliveryTag, boolean multiple)
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("手动过期的订单信息,准备关闭订单:"+orderEntity.getOrderSn());
}
三、库存服务使用mq
@Configuration
public class RabbitmqConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@RabbitListener(queues = {"stock.release.stock.queue"})
public void ListenQueue(Message message, Channel channel){
}
/**
* json序列化
*/
@Bean
public MessageConverter RabbitmqConvertJSON(){
return new Jackson2JsonMessageConverter();
}
@Bean
public Exchange StockEventExchange(){
//TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
return new TopicExchange("stock-event-exchange",true,false);
}
@Bean
public Queue StockReleaseStockQueue(){
//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)
return new Queue("stock.release.stock.queue",true,false,false);
}
@Bean
public Queue StockDelayQueue(){
Map<String,Object> map = new HashMap<>();
//死信路由exchange
map.put("x-dead-letter-exchange","stock-event-exchange");
//死信routing-key
map.put("x-dead-letter-routing-key","stock.release");
//time to live
map.put("x-message-ttl",60000);
return new Queue("stock.delay.queue",true,false,false,map);
}
@Bean
public Binding StockLocked(){
//Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)
return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);
}
@Bean
public Binding StockRelease(){
return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);
}
}