目录
生产者确认
消息持久化
消费者确认
TTL延迟队列
TTL延迟消息
惰性队列
生产者确认
生产者确认就是:发送消息的人,要确保消息发送给了消息队列,分别是确保到了交换机,确保到了消息队列这两步。
1、在发送消息服务的application.yml中添加配置
spring:
rabbitmq:
publisher-confirm-type: correlated # 异步回调
publisher-returns: true
template:
mandatory: true
2、确保消息到交换机
package cn.zsh.mq.spring;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.UUID;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testConfirmCallBack() {
// 1、定义消息
String message = "ABC";
// 设置一个消息的唯一ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3、confirm-ack
correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("消息发送异常:" + ex.toString());
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
if (result.isAck()) {
// 说明到了交换机
System.out.println("publish-confirm:ack==消息发送成功:" + correlationData.getId());
} else {
// 消息没有到交换机
System.out.println("publish-confirm:nack==消息发送失败:" + correlationData.getId());
}
}
});
// 4、消息发送
rabbitTemplate.convertAndSend("191exchange","191",message,correlationData);
}
}
3、确保消息从交换机路由到队列
创建公开CommonConfig类
package cn.zsh.mq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
/**
* 发送消息到交换机没有到消息队列
*/
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 1、获取RabbitTemplate(获取启动中的Bean的方式)
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 2、设置回调函数
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("发送消息失败,没到队列===消息:{}, 交换机:{}, 路由Key:{}, 响应CODE:{}, 相应内容:{}", message,exchange,routingKey,replyCode,replyText);
}
});
}
}
消息持久化
消息持久化就是:确保消息不会在交换机或者队列中丢失。
案例:
使用SpringAMQP创建出来的交换机和队列,默认就是做了持久化的
package cn.zsh.mq.config;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* 创建交换机与队列
*/
@Component
public class FoundQueue {
@Bean
public DirectExchange qiuExchange(){
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new DirectExchange("qiu.deirect",true,false);
}
@Bean
public Queue piqiuQueue(){
// 使用QueueBuilder构建队列,durable就是持久化的
return QueueBuilder.durable("piqiu.queue").build();
}
}
消费者确认
消费者确认就是:消费者把消息从队列中获取出来,然后要消费成功,队列中的消息才能被删除掉。
方案一:消费者确认
加入这个配置以后,消费者消费失败,会直接重试或者删除,具体取决于设置的是none还是auto。
默认是none,不建议设置为auto模式因为会一直不断地尝试,这样会导致服务器压力很大。
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # none:投递完立马删除 auto:失败后让你再次重试(重新投递到队列)知道成功
方案二:消费者失败重试,重试固定次数后,则删除当前消息
加入这个配置以后,消费者消费失败会重试固定的次数,然后将消息删除。
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初始的失败等待时长为1秒
multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * initial-interval
max-attempts: 3 # 最大重试次数
stateless: true # ture:无状态 false:有状态。如果业务中包含事务,这里改成false
方案三:消费者失败重试,重试固定次数后,将当前消息发送给error交换机路由给error队列
加入这个配置之后,重试固定次数后,会将这条消费失败的消息发送给error交换机,路由给error队列。
1、在消费者(消息接收者)中加入配置
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初始的失败等待时长为1秒
multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * initial-interval
max-attempts: 3 # 最大重试次数
stateless: true # ture:无状态 false:有状态。如果业务中包含事务,这里改成false
2、创建error交换机和队列并绑定
package cn.zsh.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class ErrorConfig {
/**
* 定义交换机
* @return
*/
@Bean
public DirectExchange errorExchange2(){
return new DirectExchange("error.direct");
}
/**
* 定义队列
* @return
*/
@Bean
public Queue errorQueue2(){
return new Queue("error.queue");
}
@Bean
public Binding bindErrorQueue(DirectExchange errorExchange2,Queue errorQueue2){
return BindingBuilder.bind(errorQueue2).to(errorExchange2).with("error");
}
}
3、在启动类或者配置类中加入配置
package cn.zsh.mq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct","error");
}
}
TTL延迟队列
延迟队列:
延迟队列就是消息发送到当前队列,会延迟一段时间,然后进行处理,具体如下图:
发送消息给指定队列,然后消息回延迟固定的时间,这个延迟时间是在对应的延迟消息队列中设置的。经过延迟以后,会将消息发送给其他的交换机,然后再路由给对应的消息队列,再进行消费,实现延迟的效果。
使用案例:
1、创建处理延迟消息的队列和交换机
当前交换机名称为:dl.direct
当前消息队列名称为:dl.queue
RoutingKey是:dl
package cn.zsh.mq.config;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 创建,处理延迟消息的,交换机和队列
*/
@Component
public class TtlConfig {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.queue",durable = "true"), // 处理延迟消息的队列名称
exchange = @Exchange(name = "dl.direct",durable = "true"), // 处理延迟消息的交换机名称
key = "dl" // 当前的RoutingKey
))
public void consumerDdlMessage(String message){
System.out.println("接收到延迟消息:" + message);
}
}
2、创建延迟交换机、队列,并绑定
package cn.zsh.mq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class publisherTtlConfig {
@Bean
public DirectExchange ttlDirectExchange(){
return new DirectExchange("ttl.direct"); // 延迟交换机名称
}
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue") // 延迟队列名称
.ttl(10000) // 延迟队列的延迟时间(当前为10秒)
.deadLetterExchange("dl.direct") // 设置延时时间到了以后发送到哪个交换机(处理延迟消息的交换机)
.deadLetterRoutingKey("dl") // 设置具体到那个交换机的具体队列(处理延迟消息队列的RoutingKey)
.build();
}
/**
* 绑定延迟队列与交换机
* @return
*/
@Bean
public Binding bingTtlQueue(){
return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl"); // 将延迟队列与交换机绑定,并设置RoutingKey(这个RoutingKey是当前延迟消息队列的RoutingKey)
}
}
3、发送任意消息到当前延迟队列(ttl.queue)即可实现延迟效果。
延时时间到了以后,会将消息发送给(dl.direct)交换机,路由给RoutingKey为(dl)的消息队列dl.queue。有绑定了dl.queue的队列进行消息的最终处理。
TTL延迟消息
延迟消息:
延迟消息是给消息设置延迟时间,然后将消息发送给延迟队列,可以实现延迟。
注意!!!延迟消息的延迟时间,与延迟队列的延迟时间,哪个时间短,就使用哪个延迟时间。
例1:延迟消息设置延迟时间为5秒,延迟队列设置延迟时间为10秒,则消息发送给延迟队列后,延迟5秒然后就会被处理。
例2:延迟消息设置延迟时间为20秒,延迟队列设置延迟时间为10秒,则消息发送给延迟队列后,延迟10秒然后就会被处理。
使用案例:
延迟消息必须发送给延迟队列,因为延迟时间按最短的执行。发送给没有设置延迟时间的消息队列,会直接被消费。
package cn.zsh.mq.spring;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testTTLMessage(){
// 创建消息
Message message = MessageBuilder
.withBody("这是一条延时5秒后执行的消息".getBytes(StandardCharsets.UTF_8))
.setExpiration("5000")// 延时时间
.build();
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息(这个消息必须要发给延时的消息队列)
rabbitTemplate.convertAndSend("ttl.direct","ttl",message,correlationData);
}
}
惰性队列
惰性队列是为了防止消息大量积压的一种队列。
消息队列中的消息一般都存在内存中,而消息大量积压,就会产生很多问题,这时候可以使用惰性队列,惰性队列的消息保存在磁盘中。
创建惰性队列:
方案一:
在代码中声明
package cn.itcast.mq.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CommonConfig {
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy()
.build();
}
}
方案二:
在浏览器中MQ的控制台声明
第一步:
第二步:
额外补充: