八、死信队列
8.1、死信的三大来源
代码结构图:
架构图逻辑说明:
生产者发送消息至交换机(正常交换机),由交换机根据routing-key决定发送到哪个队列(正常队列),此时触发以下三种条件之一,正常队列将消息发送到死信交换机,并指明routing-key发往死信队列,再由死信队列消费者进行消费。
8.1.1、消息 TTL 过期
生产者:
private static final String EXCHANGE_NAME = "normal_exchange";
/**
* 设置过期时间导致消息死信
* @param args
* @throws Exception
*/
public static void main1(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//死信消息 设置TTL时间 -》time to live 单位是ms
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
//发消息
for (int i = 0; i < 10; i++) {
String message = "info" + i;
channel.basicPublish(EXCHANGE_NAME,"zhangsan",properties,message.getBytes(StandardCharsets.UTF_8));
}
}
正常消费者:
/**
* 正常交换机名称
*/
private static final String NORMAL_EXCHANGE = "normal_exchange";
/**
* 死信交换机名称
*/
private static final String DEAD_EXCHANGE = "dead_exchange";
/**
* 交换机类型
*/
private static final String TYPE = "direct";
/**
* 普通队列名称
*/
private static final String NORMAL_QUEUE = "c1";
/**
* 死信队列名称
*/
private static final String DEAD_QUEUE = "c2";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//声明普通交换机和死信交换机
channel.exchangeDeclare(NORMAL_EXCHANGE,TYPE);
channel.exchangeDeclare(DEAD_EXCHANGE,TYPE);
//声明普通队列
Map<String,Object> argument = new HashMap<>();
//正常的队列设置消息变成死信之后发送给哪个死信交换机
argument.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信rountingKEY-》即死信消息发送给死信交换机后发往哪个死信队列
argument.put("x-dead-letter-routing-key","lisi");
channel.queueDeclare(NORMAL_QUEUE,false,false,false,argument);
//声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//绑定普通交换机与队列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
//绑定死信交换机与队列
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("C1等待接收消息......");
//声明 接收消息
DeliverCallback deliverCallback = (consumeTag, message) ->{
String msg = new String(message.getBody(),"UTF-8");
System.out.println(msg);
};
//取消消费的回调
CancelCallback cancelCallback = (consumeTag) -> {
System.out.println("消息消费被中断");
};
//开启手动应答
channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);
}
生产者和消费者线程启动后将消费者线程杀死,即消息无法被消费,等待消息过期后查看死信队列当中的消息数量或死信消费者的控制台输出。
注意:消息过期时间也可以由队列来决定,argument集合中添加如下参数即可:
//过期时间-》10s
argument.put("x-message-ttl",10_000);
8.1.2、队列达到最大长度(队列满了,无法再添加数据到 mq 中)
生产者:
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
for (int i = 0; i < 10; i++) {
String message = "info" + i;
channel.basicPublish(EXCHANGE_NAME,"zhangsan",null,message.getBytes(StandardCharsets.UTF_8));
}
}
消费者:
声明队列时添加参数(其余代码与情况1代码一致):
//设置正常队列的最大容量
argument.put("x-max-length",6);
先启动消费者线程,由消费者线程声明完交换机和队列后将消费者线程杀死,此时再启动生产者线程发送消息,此处发送十条消息,由于队列最多存储6条消息,其余4条则发往死信队列,生产者程序执行完成后查看WEB端两个队列当中的消息条数即可。
8.1.3、消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
消费者(整体代码与情况1一致,区别在于接收到消息时的回调函数):
DeliverCallback deliverCallback = (consumeTag, message) ->{
String msg = new String(message.getBody(),"UTF-8");
if (msg.equals("info5")){
//拒绝该消息接收
System.out.println("拒绝该消息:" + msg);
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
}else {
System.out.println("Consumer1接收的消息是:" + new String(message.getBody(),"UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
};
生产者和消费者线程一并启动,生产者代码与情况2当中的代码一致,当发送消息为:“info5”时正常消费者拒绝该消息,此时正常队列将该消息发送到死信队列进行处理。
8.1.4、死信消费者
private static final String DEAD_QUEUE = "c2";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//声明 接收消息
DeliverCallback deliverCallback = (consumeTag, message) ->{
System.out.println("Consumer2接收的消息是:" + new String(message.getBody(),"UTF-8"));
};
//取消消费的回调
CancelCallback cancelCallback = (consumeTag) -> {
System.out.println("消息消费被中断");
};
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
}
九、整合SpringBoot
添加依赖:
<dependencies>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置文件:
spring:
rabbitmq:
host: 43.138.78.150
port: 5672
username: admin
password: 123
virtual-host: /test
publisher-confirm-type: CORRELATED
publisher-returns: true
十、延迟队列
延迟队列是死信队列的第一种情况(消息TTL过期)进一步演化而来的,原因如下:
在我们上述死信队列代码演示当中说明了两种设置消息过期的方法,一种是发送消息时指明消息多久后过期,另外一种是设置某队列当中消息的过期时间,这两种方式是有差异的,如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
10.1、简单延迟队列实现
实现逻辑:和上述的死信队列实现方式类似,区别在于我们不再需要正常消费者,只需要设置死信消费者即可。
代码结构图:创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:
生产者:
由于整合了SpringBoot,我们将不再使用main函数的形式发送消息,而是采用web界面的输入来发送消息。
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
//发消息
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date(),message);
rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10s的队列:"+message);
rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40s的队列:"+message);
}
}
队列声明、交换机声明、绑定关系:
@Configuration
public class QueueTtlConfig {
//普通交换机的名称
private static final String NORMAL_EXCHANGE = "X";
//死信交换机的名称
private static final String DEAD_EXCHANGE = "Y";
//普通队列1的名称
private static final String NORMAL_QUEUE_1 = "QA";
//普通队列2的名称
private static final String NORMAL_QUEUE_2 = "QB";
//死信队列的名称
private static final String DEAD_QUEUE = "QD";
//声明X 交换机
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(NORMAL_EXCHANGE);
}
//声明Y 交换机
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(DEAD_EXCHANGE);
}
//声明普通队列A TTL为10s
@Bean("queueA")
public Queue queueA(){
Map<String,Object> arguments = new HashMap<>(3);
//过期时间-》10s
arguments.put("x-message-ttl",10_000);
//正常的队列设置消息变成死信之后发送给哪个死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信routingKEY-》即死信消息发送给死信交换机后发往哪个死信队列
arguments.put("x-dead-letter-routing-key","YD");
return QueueBuilder.durable(NORMAL_QUEUE_1).withArguments(arguments).build();
}
//声明普通队列B TTL为40s
@Bean("queueB")
public Queue queueB(){
Map<String,Object> arguments = new HashMap<>(3);
//过期时间-》10s
arguments.put("x-message-ttl",40_000);
//正常的队列设置消息变成死信之后发送给哪个死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信routingKEY-》即死信消息发送给死信交换机后发往哪个死信队列
arguments.put("x-dead-letter-routing-key","YD");
return QueueBuilder.durable(NORMAL_QUEUE_2).withArguments(arguments).build();
}
//声明死信队列
@Bean("queueD")
public Queue queueD(){
return QueueBuilder.durable(DEAD_QUEUE).build();
}
//队列QA和交换机X绑定
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queue, @Qualifier("xExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("XA");
}
//队列QB和交换机X绑定
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queue, @Qualifier("xExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("XB");
}
//队列QD和交换机Y绑定
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queue, @Qualifier("yExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("YD");
}
}
消费者:
@Component
@Slf4j
public class Consumer {
//接收消息
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMsg(Message message, Channel channel){
String msg = new String(message.getBody());
log.info("接收到的队列confirm_queue消息:{}",msg);
}
}
执行测试:
浏览器输入:http://localhost:8080/ttl/sendMsg/你好
可以看到死信消费者消费了两个队列发送给死信队列的消息。
10.2、延迟队列优化
在上面实现的延迟队列当中,我们的发送消息的延迟时间是固定的,只有10s和40s两种,但是在实际应用场景当中,我们的延迟时间是不固定的,换句话说,延迟时间是由用户的需求所决定的。因此,我们需要再生产者这边就决定消息的延迟时间。
代码架构图:在上面实现的架构当中添加队列QC,绑定关系如下,不设置其TTL时间。
生产者:
//发消息-自定义TTL
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendExpirationMsg(@PathVariable String message,@PathVariable String ttlTime){
log.info("当前时间:{},发送一条过期时长为{}毫秒的消息给随机TTL队列:{}",new Date(),ttlTime,message);
rabbitTemplate.convertAndSend("X","XC","消息来自TTL为自定义的队列:"+message,msg ->{
//发送消息的时候 延迟时长
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
});
}
消费者:消费者没有变化
队列声明、交换机声明、绑定关系:在上面的声明配置类当中添加如下代码
//普通队列3的名称
private static final String NORMAL_QUEUE_3 = "QC";
//声明普通队列C
@Bean("queueC")
public Queue queueC(){
Map<String,Object> arguments = new HashMap<>(3);
//正常的队列设置消息变成死信之后发送给哪个死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信routingKEY-》即死信消息发送给死信交换机后发往哪个死信队列
arguments.put("x-dead-letter-routing-key","YD");
return QueueBuilder.durable(NORMAL_QUEUE_3).withArguments(arguments).build();
}
//队列QC和交换机X绑定
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queue,@Qualifier("xExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("XC");
}
测试:
浏览器输入:
观察控制台我们发现就有问题了,20s时我们发送“你好1”,延迟时间20s,41s时死信队列收到该消息,有1s的误差是可以接受的,因为网络传输也需要时间,但是,我们30s时发送“你好2”,延迟时间2s,按道理来说是需要在32s左右时收到该消息,可以看到,收到该消息的时间与消息1一致,这就是我们上面所说的在发送消息时设置TTL属性的弊端。究其原因:因为RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
10.3、解决延迟队列优化后的弊端
10.3.1、安装延迟队列插件
10.3.2、安装完成后
10.3.3、代码架构图
10.3.4、代码实现
声明:
@Configuration
public class DelayedQueueConfig {
//交换机
private static final String DELAYED_EXCHANGE_NAME = "delayed_exchange";
//队列
private static final String DELAYED_QUEUE_NAME = "delayed_queue";
//routing-key
private static final String DELAYED_ROUTING_KEY = "delayed_routingKey";
//声明队列
@Bean
public Queue delayedQueue(){
return new Queue(DELAYED_QUEUE_NAME);
}
//声明交换机
@Bean
public CustomExchange delayedExchange(){
Map<String,Object> arguments = new HashMap<>();
arguments.put("x-delayed-type","direct");
/**
* 1.交换机的名称
* 2.交换机的类型
* 3.是否需要持久化
* 4.是否需要自动删除
* 5.其他参数
*/
return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);
}
//队列和交换机绑定
@Bean
public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue queue,@Qualifier("delayedExchange") CustomExchange customExchange){
return BindingBuilder.bind(queue).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs();
}
}
生产者:
//发消息-基于插件 发送消息+时间
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendDelayedMsg(@PathVariable String message,@PathVariable Integer delayTime){
log.info("当前时间:{},发送一条过期时长为{}毫秒的消息给随机TTL队列:{}",new Date(),delayTime,message);
rabbitTemplate.convertAndSend("delayed_exchange","delayed_routingKey","消息来自TTL为自定义的队列:"+message,msg ->{
//发送消息的时候 延迟时长
msg.getMessageProperties().setDelay(delayTime);
return msg;
});
}
消费者:
@Slf4j
@Component
public class DelayQueueConsumer {
//接收消息
@RabbitListener(queues = "delayed_queue")
public void receiveD(Message message, Channel channel) throws Exception{
String msg = new String(message.getBody(),"UTF-8");
log.info("当前时间:{},收到死信队列的消息:{}",new Date(),msg);
}
}
10.3.5、测试验证
浏览器输入:
十一、发布确认高级模式
在上一篇中我们将了发布确认模式的必要性,我们此时考虑一个复杂的场景,例如,生产者发送消息时,MQ此时刚好宕机重启了,甚至整个MQ集群都不可用了,那重启期间生产者发送的消息就会丢失,需要我们手动处理和恢复。此时我们思考一个问题,如何保证消息可靠投递呢?换句话说,极端情况下,无法投递的消息我们该如何处理?
11.1、基础逻辑代码
配置类:
@Configuration
public class ConfirmConfig {
//交换机
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
//队列
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
//routingKey
public static final String ROUTING_KEY = "key1";
//声明交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
//声明队列
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
//绑定
@Bean
public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange directExchange,
@Qualifier("confirmQueue") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY);
}
}
生产者:
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.ROUTING_KEY,message1.getBytes(StandardCharsets.UTF_8));
log.info("发送消息内容:{}",message);
}
消费者:
@Component
@Slf4j
public class Consumer {
//接收消息
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMsg(Message message, Channel channel){
String msg = new String(message.getBody());
log.info("接收到的队列confirm_queue消息:{}",msg);
}
}
11.2、消息发送失败以及消息回退处理(交换机未收到、队列未收到)
配置文件添加如下配置:
publisher-confirm-type: CORRELATED//发布消息成功到交换器后会触发回调方法
11.2.1、消息未被交换机或队列收到时的回调函数
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。此时就需要添加如下配置:
publisher-returns: true//消息回退
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
//注入
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
//交换机确认回调方法
/**
* 1.发消息 交换机收到了-》回调
* 1.1 correlationData 保存了回调消息的ID及相关信息
* 1.2 交换机是否收到消息 true
* 1.3 交换机没有收到消息的原因-》null
* 2.发消息 交换机接收失败了-》回调
* 2.1 correlationData 保存了回调消息的ID及相关信息
* 2.2 交换机是否收到消息 false
* 2.3 交换机没有收到消息的原因-》reason
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : "0";
if (b){
log.info("交换机已经收到了ID为:{}的消息",id);
}else {
log.info("交换机没有收到ID为:{}的消息,原因为:{}",id,s);
}
}
//当生产者成功发送消息至交换机后但未发送至队列时的回调函数
//当消息不可达队列时才会触发此函数
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("消息{}被交换机{}退回,退回的原因:{},路由key:{}",
returnedMessage.getMessage().getBody().toString(),
returnedMessage.getExchange(),
returnedMessage.getReplyText(),
returnedMessage.getRoutingKey());
}
}
验证:
①、交换机错误
将发送消息时的交换机名称进行修改,即可观察到所触发的回调函数。
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME + "123",ConfirmConfig.ROUTING_KEY,message1.getBytes(StandardCharsets.UTF_8),correlationData);
②、路由错误
生产者代码修改:
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
String message1 = message + "key1";
CorrelationData correlationData = new CorrelationData();
String id = String.valueOf(UUID.randomUUID());
correlationData.setId(id);
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.ROUTING_KEY,message1.getBytes(StandardCharsets.UTF_8),correlationData);
log.info("发送消息1内容:{}",message + "key1");
CorrelationData correlationData2 = new CorrelationData();
String id2 = String.valueOf(UUID.randomUUID());
correlationData2.setId(id2);
String message2 = message + "key12";
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.ROUTING_KEY + "2",message2.getBytes(StandardCharsets.UTF_8),correlationData2);
log.info("发送消息2内容:{}",message + "key12");
}
十二、备份交换机
有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
12.1、代码结构图及其逻辑
1.Web端输入消息内容
2.控制器将消息封装成两份,message1和message2,message1使用正确的routing-key路由到队列当中,message2使用错误的routing-key路由不到队列中。
3.message1由正常消费者进行消费,message2回退至交换机,由交换机发往备份交换机进行处理,最后由报警队列的消费者进行消费。
12.2、代码
1.生产者:生产者代码使用发布确认高级模式当中的生产者代码。
2.普通交换机及其队列声明:
声明同样采用发布确认高级模式当中的代码,只需要将交换机声明的代码修改为如下代码:
//声明交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
Map<String,Object> arguments = new HashMap<>();
arguments.put("alternate-exchange",MsgFailSendConfig.BACKUP_EXCHANGE_NAME);
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArguments(arguments).build();
}
使用如上配置后,消息未路由到队列时,将消息发送至备份交换机。
3.备份交换机及其队列声明:
@Configuration
public class MsgFailSendConfig {
//备份交换机
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
//备份队列
public static final String BACKUP_QUEUE_NAME = "backup_queue";
//报警队列(或者二次处理队列)
public static final String WARNING_QUEUE_NAME = "warning_queue";
//声明备份交换机
@Bean("backupExchange")
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
//声明备份队列和报警队列
@Bean("backupQueue")
public Queue backupQueue(){
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
//绑定
@Bean
public Binding backupBindingExchange(@Qualifier("backupQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
public Binding warningBindingExchange(@Qualifier("warningQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
}
12.3、效果验证
浏览器输入:
http://localhost:8080/confirm/sendMsg/你好
控制台效果:
十三、优先级队列
13.1、使用场景
13.2、如何添加
a.控制台页面添加
b.代码层面添加
Map<String,Object> arguments = new HashMap<>();
arguments.put("x-max-priority",10);//设置最大优先级 取值范围0-255 优先值越大,优先级越高
/**
* 创建一个队列
* 1.队列名称
* 2.队列里面的消息是否持久化(磁盘),默认情况下消息存储在内存中(并不进行持久化)
* 3.该队列是否只供一个消费者进行消费 是否进行消息的共享,true可以多个消费者消费,默认为false:只能一个消费者消费
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true自动删除 false不自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,arguments);
13.3、实战
生产者:
public class Producer {
//队列名称
private static final String QUEUE_NAME = "hello";
//发消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
Map<String,Object> arguments = new HashMap<>();
arguments.put("x-max-priority",10);//设置最大优先级 取值范围0-255 优先值越大,优先级越高
/**
* 创建一个队列
* 1.队列名称
* 2.队列里面的消息是否持久化(磁盘),默认情况下消息存储在内存中(并不进行持久化)
* 3.该队列是否只供一个消费者进行消费 是否进行消息的共享,true可以多个消费者消费,默认为false:只能一个消费者消费
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true自动删除 false不自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,arguments);
for (int i = 11; i < 21; i++) {
String message = "info:" + i;
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(i-10).build();//设置消息优先级
/**
* 发送一个消息
* 1.发送到哪个交换机
* 2.路由的key值是哪个 本次是队列名称
* 3.其他参数信息
* 4.发送消息的消息体
*/
channel.basicPublish("",QUEUE_NAME,properties,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息体:" + message + " 优先级:" + (i - 10));
}
}
}
消费者:
public class Consumer {
//队列的名称
public static final String QUEUE_NAME = "hello";
//接收消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
/**
* 声明 接收消息
*/
DeliverCallback deliverCallback = (consumeTag, message) ->{
System.out.println(new String(message.getBody()));
};
//取消消费的回调
CancelCallback cancelCallback = (consumeTag) -> {
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否自动应答 true代表的是自动应答 false代表的是手动应答
* 3.当一个消息发送过来后的回调接口
* 4.消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
结果验证:
十四、惰性队列
14.1、使用场景
14.2、两种模式
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);