目录
- 前言
- 工作流程-灵魂画手
- 名词解释
- 交换机类型
- 一、安装
- 1.1 [RabbitMQ官网安装](https://www.rabbitmq.com/download.html)
- 1.2 Docker安装并启动
- 二、食用教程
- 2.1.导入依赖
- 2.2 添加配置
- 2.3 代码实现
- 2.3.1 直连(Direct)类型
- 2.3.2 引入消息手动确认机制
- 2.3.2 广播(Fanout)类型
- 2.3.3 主题(Topic)类型
- 三、实战应用场景
- 3.1 如何控制消息有序
- 3.2 保证消息不被重复消费(幂等性)
- 3.3 保证消息的可靠性
- 3.4 死信队列解决订单超时未支付
- 总结
前言
RabbitMQ是一个由erlang语言开发,实现了AMQP(Advanved Message Queue Protocol)高级消息队列协议的消息服务中间件。
工作流程-灵魂画手
1、生产者(Producer)和消费者(Consumer)都需要在与RabbitMQ建立长连接(Connection)的前提下,才能收发消息
2、客户端(生产者、消费者)和服务端(RabbitMQ)只能建立一条长连接,在长连接中开辟一条条的信道进行收发消息
3、生产者发送消息,消息到达Broker指定虚拟主机(服务会配置)的指定交换机(发送消息会指定),根据路由键和交换机与队列的绑定关系,把消息发送给对应的队列
3、消费者通过信道监听队列,消息进入队列就可以被消费者实时拿到
名词解释
1、Broker (message broker) 消息代理:消息队列服务器实体,简单理解为邮局,寄收件都要通过它。
2、JMS(Java Message Service)JAVA消息服务。是基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现
3、AMQP(Advanced Message Queuing Protocol)
高级消息队列协议,也是一个消息代理的规范,兼容JMS
RabbitMQ是AMQP的实现
4、Message 消息,由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
5、Producer消息的生产者,也是一个向交换器发布消息的客户端应用程序。
6、Exchange 交换机 ,用来接收生产者发送的消息,并将这些消息路由给服务器中的队列。
Exchange常用有3种类型:direct、fanout、 topic、不同类型的Exchange转发消息的策略有所区别
7、Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
8、Binding绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Exchange 和 Queue 的绑定可以是多对多的关系。
9、Connection 网络连接,比如一个TCP连接。
10、Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
11、Consumer 消费者,表示一个从消息队列中取得消息的客户端应用程序。
12、Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
交换机类型
一、安装
1.1 RabbitMQ官网安装
1.2 Docker安装并启动
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management
# 开机自启
docker update rabbitmq --restart=always
● 5672 (AMQP端口)
● 15672 (web管理后台端口)
本地安装可通过:http://127.0.0.1:15672/访问,用户名密码默认guest
二、食用教程
2.1.导入依赖
<!--RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2 添加配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: root #用户名 默认guest
password: root #密码 默认guest
virtual-host: springboot-test #虚拟主机 默认/
2.3 代码实现
2.3.1 直连(Direct)类型
直连型交换机,根据消息携带的路由键将消息投递给对应队列。
直连类型初始化配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 1、直连交换机配置
*/
@Configuration
public class DirectRabbitConfig {
public static final String DIRECT_QUEUE = "===DirectQueue===";
public static final String DIRECT_EXCHANGE = "===DirectExchange===";
public static final String DIRECT_ROUTING = "===DirectRouting===";
/**
* durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
* exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
* autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
* return new Queue("TestDirectQueue",true,true,false);
*/
@Bean
public Queue directQueue() {
return new Queue(DIRECT_QUEUE,false);
}
@Bean
DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE,false,false);
}
@Bean
Binding binding() {
return BindingBuilder.bind(directQueue())
.to(directExchange()).with(DIRECT_ROUTING);
}
}
该配置主要把队列、交换机、绑定都交由spring管理,记得声明队列、交换机、建立绑定关系。消息指定交换机发送后,交换机就可以根据路由键把消息发送到匹配的队列上。
消费者
import com.chendi.springboot_rabbitmq.config.DirectRabbitConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
public class DirectReceiver {
@RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE)
public void receiver(String dataMsg) {
log.info("接收者A dataMsg:{} ",dataMsg);
}
@RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE)
public void receiver(String dataMsg) {
log.info("接收者B dataMsg:{} ",dataMsg);
}
}
生产者
@RestController
@RequiredArgsConstructor
public class RabbitMQTestController {
final RabbitTemplate rabbitTemplate;
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
for (int i = 0; i < 10; i++) {
String messageData = "Hello World!" + i;
//可自定义消息体类型
rabbitTemplate.convertAndSend(DirectRabbitConfig.DIRECT_EXCHANGE, DirectRabbitConfig.DIRECT_ROUTING, messageData);
}
return "发送完成";
}
}
运行发现:默认情况下,RabbitMQ轮询分发将按顺序将每个消息发送给下一个使用者。有如下缺点:
1、无法保证消息已被消费
2、处理消息快的服务得到的消息和处理消息慢的服务是一样多的(公平分发、能者多劳)。
2.3.2 引入消息手动确认机制
配置文件
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 设置消费端手动 ack
#表示消费者端每次从队列拉取多少个消息进行消费,直到手动确认消费完毕后,才会继续拉取下一条
prefetch: 1 # 预加载消息数量--QOS
消费者应答
@Slf4j
@Component
public class DirectReceiver {
@RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE)
public void receiver(String dataMsg, Channel channel, Message message) throws IOException, InterruptedException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
Thread.sleep(1000);
log.info("接收者A deliveryTag:{} dataMsg:{} ",deliveryTag ,dataMsg);
channel.basicAck(deliveryTag,true);
}
@RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE)
public void receiver2(String dataMsg, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
log.info("接收者B deliveryTag:{} dataMsg:{} ",deliveryTag ,dataMsg);
channel.basicAck(deliveryTag,true);
}
}
回执方法(
1、channel.basicAck表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
2、channel.basicNack表示失败确认,一般在消费消息业务异常时用到此方法、可决定消息是否重新入列
3、channel.basicReject 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。
2.3.2 广播(Fanout)类型
扇型交换机,这个交换机没有路由键概念,这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
广播类型配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 2、广播、扇出交换机
*/
@Configuration
public class FanoutRabbitConfig {
public final static String FANOUT_EXCHANGE = "fanoutExchange";
public static final String FANOUT_QUEUE_A = "fanoutQueueA";
public static final String FANOUT_QUEUE_B = "fanoutQueueB";
public static final String FANOUT_QUEUE_C = "fanoutQueueC";
/**
* 创建三个队列
* 将三个队列都绑定在交换机 fanoutExchange 上
* 因为是扇型交换机, 路由键无需配置,配置也不起作用
*/
@Bean
public Queue queueA() {
return new Queue(FANOUT_QUEUE_A);
}
@Bean
public Queue queueB() {
return new Queue(FANOUT_QUEUE_B);
}
@Bean
public Queue queueC() {
return new Queue(FANOUT_QUEUE_C);
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}
消费者
import com.chendi.springboot_rabbitmq.config.FanoutRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
//如果开启了消息手动确认机制,一定要记得应答消息噢
//不然消息会一直堆积在mq里
@Slf4j
@Component
public class FanoutReceiver {
@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_A)
public void fanout_A(String message) {
log.info("fanout_A {}" , message);
}
@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_B)
public void fanout_B(String message) {
log.info("fanout_B {}" , message);
}
@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_C)
public void fanout_C(String message) {
log.info("fanout_C {}" , message);
}
}
测试生产者 Controller加上
@GetMapping("/sendFanoutMessage")
public String sendFanoutMessage() {
String messageData = "这是一条广播消息";
rabbitTemplate.convertAndSend(FanoutRabbitConfig.FANOUT_EXCHANGE, "", messageData);
return "发送完成";
}
2.3.3 主题(Topic)类型
主题交换机,特点就是在它的路由键和绑定键之间是有规则的。
「*」 (星号) 用来表示一个单词 (必须出现的)
「#」 (井号) 用来表示任意数量(零个或多个)单词
主题交换机不绑定路由键时是直连交换机,绑定「#」号时是扇形交换机。
主题模式配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 主题交换机
* 转发规则:
* #:匹配一个或者多个词
* *:匹配一个或者0个词
* 比如 有msg.# 、msg.* 匹配规则
* msg.# 会匹配 msg.email、msg.email.b、msg.email.a
* msg.* 只会匹配 msg.email 和 msg ,
*/
@Configuration
public class TopicRabbitConfig {
//绑定键
public final static String MSG_EMAIL = "msg.email";
public final static String MSG_EMAIL_A = "msg.email.a";
public final static String MSG_SMS = "msg.sms";
public final static String TOPIC_EXCHANGE = "topicExchange";
@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.MSG_EMAIL);
}
@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.MSG_EMAIL_A);
}
@Bean
public Queue thirdQueue() {
return new Queue(TopicRabbitConfig.MSG_SMS);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(MSG_EMAIL);
}
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("msg.#");
}
@Bean
Binding bindingExchangeMessage3() {
return BindingBuilder.bind(thirdQueue()).to(exchange()).with("msg.*");
}
}
消费者
import com.chendi.springboot_rabbitmq.config.TopicRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class TopicReceiver {
@RabbitListener(queues = TopicRabbitConfig.MSG_EMAIL)
public void topic_man(String message) {
log.info("队列{} 收到消息:{}" ,TopicRabbitConfig.MSG_EMAIL, message);
}
@RabbitListener(queues = TopicRabbitConfig.MSG_SMS)
public void topic_woman(String message) {
log.info("队列{} 收到消息:{}" ,TopicRabbitConfig.MSG_SMS, message);
}
@RabbitListener(queues = TopicRabbitConfig.MSG_EMAIL_A)
public void xxx(String message) {
log.info("队列{} 收到消息:{}" ,TopicRabbitConfig.MSG_EMAIL_A, message);
}
}
测试生产者 Controller加上
@GetMapping("/sendTopicMessage")
public String sendTopicMessage() {
rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE, TopicRabbitConfig.MSG_EMAIL, "Hello Topic!所有队列都可以收到这条信息");
rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE, TopicRabbitConfig.MSG_EMAIL_A, "只有 msg.email.a可以收到这条信息");
rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE, TopicRabbitConfig.MSG_SMS, "msg.email.a 和 msg.sms可以收到这条信息");
return "发送完成";
}
如果开启了消息手动确认机制,一定要记得应答消息噢!!!
以上整合就完成了。
三、实战应用场景
3.1 如何控制消息有序
1、当只有一个消费者可以保证消息有序,但是效率低。
2、生产者顺序发送消息到队列但是多个消费者监听一个队列时会轮询分发导致乱序。修改为一个消费者只监听一个队列,生产者自定义投放策略,1、2、3投放到A队列,4、5、6投放到B队列(顺序的消息为一个整体)投放至一个队列。
3.2 保证消息不被重复消费(幂等性)
在消费者消费结束后,正常情况下会发送回执给消息队列,证明该消息已被消费。但是此时消费者网络传输故障或者宕机了,消息队列收不到消息被消费的回执会将消息再分发给其他消费者,进而导致消息被消费多次。
·······
解决方法:(具体问题具体分析)
1、在redis中维护一个set,生产者在发送消息前,加上全局唯一的id,消费者消费之前,去redis中查一下,看是否消费过,如果没有消费过则继续执行。
//生产者
public void sendMessageIde() {
MessageProperties properties = new MessageProperties();
properties.setMessageId(UUID.randomUUID().toString());
Message message = new Message("消息".getBytes(), properties);
rabbitTemplate.convertAndSend("exchange", "", message);
}
//消费者
@RabbitListener(queues = "queue")
@RabbitHandler
public void processIde(Message message, Channel channel) throws IOException {
if (stringRedisTemplate.opsForValue().setIfAbsent(message.getMessageProperties().getMessageId(),"1")){
// 业务操作...
System.out.println("消费消息:"+ new String(message.getBody(), "UTF-8"));
// 手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
3.3 保证消息的可靠性
消息发送流程
可以看出,生产者发送的消息准确抵达消费者分为两部分
1、发送端 :消息投递到Broker成功时回调confirmCallback,交换机投递到队列失败时回调returnCallback
2、消费端的ack
配置文件
spring:
rabbitmq:
publisher-returns: true # 开启消息抵达队列的确认
# 低版本 publisher-confirms: true
publisher-confirm-type: correlated # 开启发送端确认
配置类
/**
* 常用的三个配置如下
* 1---设置手动应答(acknowledge-mode: manual)
* 2---设置生产者消息发送的确认回调机制 ( #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调
* publisher-confirm-type: correlated
* #保证交换机能把消息推送到队列中
* publisher-returns: true
* template:
* #以下是rabbitmqTemplate配置
* mandatory: true)
* 3---设置重试
*/
@Slf4j
@Configuration
public class RabbitConfig {
@Autowired
private ConnectionFactory rabbitConnectionFactory;
// 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的template的配置),如果想自定义来区分开 需要改变bean 的名称
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate=new RabbitTemplate(rabbitConnectionFactory);
//默认是用jdk序列化
//数据转换为json存入消息队列,方便可视化界面查看消息数据
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
//此处设置重试template后,会再生产者发送消息的时候,调用该template中的调用链
rabbitTemplate.setRetryTemplate(rabbitRetryTemplate());
rabbitTemplate.setConfirmCallback(
(correlationData, ack, cause) -> {
if(!ack){
System.out.println("ConfirmCallback "+"相关数据:"+ correlationData);
System.out.println("ConfirmCallback "+"确认情况:"+ ack);
System.out.println("ConfirmCallback "+"原因:"+ cause);
}
});
rabbitTemplate.setReturnsCallback((ReturnedMessage returned) -> {
System.out.println("ReturnsCallback: "+"消息:"+ returned.getMessage());
System.out.println("ReturnsCallback: "+"回应码:"+ returned.getReplyCode());
System.out.println("ReturnsCallback: "+"回应消息:"+ returned.getReplyText());
System.out.println("ReturnsCallback: "+"交换机:"+ returned.getExchange());
System.out.println("ReturnsCallback: "+"路由键:"+ returned.getRoutingKey());
});
return rabbitTemplate;
}
//重试的Template
@Bean
public RetryTemplate rabbitRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
// 设置监听 调用重试处理过程
retryTemplate.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
// 执行之前调用 (返回false时会终止执行)
//log.info("执行之前调用 (返回false时会终止执行)");
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
// 方法结束的时候调用
if(retryContext.getRetryCount() > 0){
log.info("最后一次调用");
}
}
@Override
public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
// 方法异常时会调用
log.info("第{}次调用", retryContext.getRetryCount());
}
});
return retryTemplate;
}
}
发送端测试
import com.chendi.springboot_rabbitmq.config.DirectRabbitConfig;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
@RestController
public class SendCallbackMessageController {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法
@ResponseBody
@GetMapping("/sendMessageToExchangeFail")
public Object sendMessageToExchangeFail() {
String messageData = "这条消息不会到达交换机";
rabbitTemplate.convertAndSend("不存在的交换机", "", messageData, new CorrelationData(UUID.randomUUID().toString()));
return messageData;
}
@ResponseBody
@GetMapping("/sendMessageToQueueFail")
public Object sendMessageToQueueFail() {
String messageData = "这条消息不会到达队列";
rabbitTemplate.convertAndSend(DirectRabbitConfig.DIRECT_EXCHANGE, "不存在的路由键", messageData, new CorrelationData(UUID.randomUUID().toString()));
return messageData;
}
}
请求结果:
3.4 死信队列解决订单超时未支付
场景:当顾客购买一件商品存在的操作
生成订单 =》 扣减库存 =》 完成支付
当库存只剩1件时,A用户下单但是迟迟未支付,会导致B用户下单时,判断库存不足导致生成订单失败。
此时,就需要解决订单超时未支付的问题。
流程 :
初始化两组正常队列和交换机A、B,A组的初始化参数x-dead-letter-exchange、x-dead-letter-routing-key指向B组的交换机和路由键。意在,A中删除或过期的数据,可以放入指定交换机指定路由键的队列中。
-这样如果设置了订单超过5min未支付
发送方在发送消息时,指定过期时间为5 * 60 * 1000
时间过期后此消息会投递到队列B(死信队列)中,队列B根据订单id去判断是否支付,去做加库存等相应的操作。
死信队列配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 解决订单超时未支付的问题
*
* 创建两个队列
* 1、队列A(正常的队列只是设置了某些参数):设置队列中的超时未消费信息指定丢到对应的队列B
* 2、队列B(也是一个正常的队列),只是把超时的信息丢给它所以称呼为死信队列
*/
@Configuration
public class DeadLetterExchangeConfig {
/**
* x-message-tti(Time-To-Live)发送到队列的消息在丟弃之前可以存活多长时间(毫秒)
* x-max-length限制队列最大长度(新增后挤出最早的),单位个数
* x-expires队列没有访问超时时,自动删除(包含没有消费的消息),单位毫秒
* x-max-length-bytes限制队列最大容量
* x-dead-letter-exchange死信交换机,将删除/过期的数据,放入指定交换机
* x-dead-letter-routing-key死信路由,将删除/过期的数据,放入指定routingKey
* x-max-priority队列优先级
* x-queue-mode对列模式,默认lazy(将数据放入磁盘,消费时放入内存)
* x-queue-master-locator镜像队列
*/
@Bean
public Queue orderQueue(){
Map<String, Object> args = new HashMap<>(2);
// 绑定我们的死信交换机
args.put("x-dead-letter-exchange", "orderDeadExChange");
// 绑定我们的路由key
args.put("x-dead-letter-routing-key", "orderDeadRoutingKey");
return new Queue("orderQueue", true, false, false, args);
}
@Bean
public Queue orderDeadQueue(){
return new Queue("orderDeadQueue");
}
@Bean
public DirectExchange orderExchange(){
return new DirectExchange("orderExchange");
}
@Bean
public DirectExchange orderDeadExchange(){
return new DirectExchange("orderDeadExChange");
}
//绑定正常队列到交换机
@Bean
public Binding orderBindingExchange(Queue orderQueue, DirectExchange orderExchange) {
return BindingBuilder.bind(orderQueue).to(orderExchange).with("orderRoutingKey");
}
//绑定死信队列到死信交换机
@Bean
public Binding deadBindingExchange(Queue orderDeadQueue, DirectExchange orderDeadExchange) {
return BindingBuilder.bind(orderDeadQueue).to(orderDeadExchange).with("orderDeadRoutingKey");
}
}
消费者
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 死信队列的消费者
*/
@Slf4j
@Component
public class DeadLetterReceiver {
@RabbitListener(queues = "orderDeadQueue")
public void orderDeadQueueReceiver(String dataMsg, Channel channel, Message message) {
try{
long deliveryTag = message.getMessageProperties().getDeliveryTag();
log.info("死信队列接收者A收到消息,根据订单id查询订单是否支付,未支付解锁库存 deliveryTag:{} dataMsg:{} ",deliveryTag ,dataMsg);
channel.basicAck(deliveryTag,false);
} catch (Exception e){
log.info("如果报错了,执行补偿机制");
}
}
}
生产者
@GetMapping("/createOrder")
public String createOrder() {
rabbitTemplate.convertAndSend("orderExchange", "orderRoutingKey", "我是订单json", message -> {
//设置过期时间10s
message.getMessageProperties().setExpiration("10000");
return message;
});
return "发送完成";
}
总结
MQ的应用场景:
- 异步处理(注册发邮件发短消息)
- 应用解耦(用户下单后,订单系统需要通知库存系统扣减库存,就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失.)
- 流量削峰(秒杀活动,一般会因为流量过大,导致应用挂掉,设置消息队列参数,如果长度超过最大值,则直接抛弃用户请求或跳转到错误页面)