前言
mq的优点:异步提速、解耦、流量削峰
mq的缺点: mq宕机可能导致消息丢失、消费者未成功消费如果保证整个系统架构的事务安全、消息可能被重复消费出现幂等问题、消息未被消费到引发死信问题、消息出现消费延迟或消费异常引发的顺序消费错乱问题...
mq的使用建议:系统扛不住了,扩容太贵了,不得不使用了
以下将根据官网提供的主流工作模式描述mq使用流程
一、队列生产消费模式
1、简单模式
消费者创建
public class SimpleConsumer {
public static final String QUEUE_NAME = "hello world";
//队列的名称
public static void consumeMessage() throws IOException, TimeoutException {
//创建工厂
ConnectionFactory factory = new ConnectionFactory();
//工厂IP 连接rabbitmq队列
factory.setHost("127.0.0.1");
//设置用户名称
factory.setUsername("guest");
//设置密码
factory.setUsername("guest");
//创建连接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
//声明 接受消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
//取消消息回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
//消费者消费消息:消费哪个队列、消费成功之后是否要自动应答(true:自动应答,false:手动应答)、消费失败的回调、取消消息的回调
System.out.println(channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback));
}
}
生产者创建
public class SimpleProducer {
public static final String QUEUE_NAME = "hello world";
//消息发送
public static void buildMessage() throws IOException, TimeoutException {
//创建工厂
ConnectionFactory factory = new ConnectionFactory();
//工厂IP 连接rabbitmq队列
factory.setHost("127.0.0.1");
//设置用户名称
factory.setUsername("guest");
//设置密码
factory.setUsername("guest");
//创建连接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
//创建队列:
// 设置队列名称、是否持久化、是否共享队列消息的消费(true:共享消费,false:只有一个消费者消费)、是否自动删除(true:消息用完后自动删除、false:)、其它参数(延迟、死信消息处理用)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送消息
String message = "hello world!!!";
//消息发送
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
}
2、工作队列模式
工作队列(任务队列)主要是消费端注册多个消费者以加速消息消费,防止消息消费速度慢、延迟、死信等异常,适用于任务比较重的地方
模式特点:
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息
代码和简单模式没区别,额外要做的就是对消费者线程多开,可以对线程做些标记观察是否真的有轮询消费
3、Pub/Sub订阅模式
与之前的模式相比,该模式可以提供消息共享,即一个消息被多个消费者消费,但是一个队列中的消息仍然只能消费一次,那如何做到的多消费呢?答案是利用交换订阅技术,即由交换机来路由消息给多个队列从而实现多次消费
Exchange(交换机)
交换机的工作非常简单,一方面接受来自生产者的消息,另一方面将消息推到队列。关键的点在于交配机必须精确处理消息,即把消息推到哪个队列或者是否应该丢弃
交换机类型描述:直接(direct/route)、主题(topic)、标题(headers)、扇出(fanout)
注:比如前面代码发布消息时channel.basicPublish方法指定的exchange是""或者null,实际走的是name为default的exchange,其类型为direct,routingkey默认是队列名称
临时队列
未设置持久配置都是临时队列,即durable参数配置的值不是durable,哦還要看生成者是否配置了autodelete,如果autodelete參數配置的值是true也是臨時隊列,因为用完就把队列删除了嘛谁还管队列是否配置了持久
创建临时队列的快速方法:String queueName = channel.queueDeclare().getQueue()
Bind
交换机绑定queue
Fanout
扇出这种类型就是广播,我不知道谁做的中文翻译,我*你**,就是消息广播到所绑定的所有队列,和routingkey没有关系
4、Direct订阅模式
这个和fanout模式差不多,区别在于exchange的type是direct,需要在bind队列时设置routing key,exchange根据routing key找到bind的queue,然后路由消息到这个queue
5、Topic
这个是对direct的扩展,direct的缺点在于需要硬编码到代码,如果以后要扩展别的队列还需要去生产者手动添加新routing key,当然我们可以通过服务配置来弥补这个问题,但是维护也需要人力,而topic可以很好的解决这个耦合问题
topic模式要求
交换机的routing key不能随意写,必须是一个单词列表,以 . 分隔,单词列表长度<=255
还有两个替换符,*代替一个单词,#代替零或多个单词,例如:*.orange.*;lazy.# ...
二、死信队列
死信:
死信是指无法被消费的消息,一般来说produer将消息投递到queue,consumer从queue取出消息进行消费,但是某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就是死信消息,为了保存这些死信消息就有了死信队列。
应用场景:
为了保证订单业务的消息数据不丢失,需要使用rabbitmq的死信队列机制,当消息消费发生异常,将消息投入死信队列中。
死信来源:
消息ttl过期
队列达到最大长度(队列满了,无法再添加数据到mq)
消息被拒绝(basic.reject或者basic.nack)并且requeue=false
消费者都宕机了
三.延迟队列
延迟队列是用来存放需要在指定时间被处理的元素的队列
1.应用场景
外卖订单规定在15min之内下单,否则失效
新创建的店铺,如果十天内没有上传商品,自动发送消息提醒
用户注册账户成功,如果三天内没有登陆进行短信提醒
用户发起退款,如果三天内没有得到处理会通知线下服务人员
预定会议后,在预定时间的前15分钟消息提醒参会人员
2.代码操作
2.1.框架图
创建两个队列 QA 和 QB,两个队列 TTL 分别设置为 10s 和 40s,然后再创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:
2.2.配置信息
@Configuration
public class TtlQueueConfig {
/**
* 普通交换机名称
*/
public static final String X_EXCHANGE = "X";
/**
* 死信交换机名称
*/
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
/**
* 普通队列名称
*/
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
/**
* 死信队列名称
*/
public static final String DEAD_LETTER_QUEUE = "QD";
/**
* 声明 XExchange
*/
@Bean
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
/**
* 声明 yExchange
*/
@Bean
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
/**
* 声明队列QA
*/
@Bean
public Queue queueA(){
Map<String, Object> arguments = new HashMap<>(3);
// 设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 设置死信路由键
arguments.put("x-dead-letter-routing-key", "YD");
// 设置过期时间
arguments.put("x-message-ttl", 10000);
return new Queue(QUEUE_A, true, false, false, arguments);
}
/**
* 声明队列QB
*/
@Bean
public Queue queueB(){
Map<String, Object> arguments = new HashMap<>(3);
// 设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 设置死信路由键
arguments.put("x-dead-letter-routing-key", "YD");
// 设置过期时间
arguments.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}
/**
* 死信队列QD
*/
@Bean
public Queue queueD(){
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
/**
* 绑定
*/
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean
public Binding queueBBindingX(){
return new Binding(QUEUE_B, Binding.DestinationType.QUEUE, X_EXCHANGE, "XB", null);
}
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
2.3.消息生产者代码
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public String sendMsg(@PathVariable String message){
log.info("当前时间:{}发送一条消息{}给两个队列", new Date(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自TTL为10s队列QA:"+message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自TTL为40s队列QB:"+message);
return "发送成功";
}
}
2.4.消息消费者代码
@Slf4j
@Component
public class DeadLetterConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message){
String msg = new String(message.getBody());
log.info("当前时间{},收到死信队列的消息:{}", new Date(), msg);
}
}
2.5.测试
发送一个请求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻
第一条消息在 10s 后变成了死信消息,然后被消费者消费掉了,第二条消息在 40s 之后变成了死信消息,然后被消费掉,这样一个延时队列就完成了。
2.6.缺点
queueA和queueB都是通过queue设置ttl,如此一来生产者无法灵活设置消息的ttl
2.7.优化
让生产者也参与设置ttl,即生产者在生产消息时设置ttl,如此一来queue即使没有设置ttl也可以实现延时操作,扩展性大幅提高。
【注:】如果生产者和queue同时设置ttl,则以最短的ttl为有效值
如图,新增一个队列 QC,绑定关系如下,该队列不设置 TTL 时间
2.8.配置信息
@Component
public class MsgTtlQueueConfig {
private static final String Y_DEAD_LETTER_EXCHANGE = "Y";
private static final String QUEUE_C = "QC";
@Bean("queueC")
public Queue queueC(){
Map<String, Object> arguments = new HashMap<>(2);
// 声明当前队列绑定的死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 声明当前队列的私信路由key
arguments.put("x-dead-letter-routing-key", "YD");
return new Queue(QUEUE_C, false, false, false, arguments);
}
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}
2.9.消费生产者
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public String sendMsg(@PathVariable String message, @PathVariable String ttlTime){
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(ttlTime);
return message;
}
};
rabbitTemplate.convertAndSend("X", "XC", message, messagePostProcessor);
return "发送成功";
}
将程序执行,然后发送请求:
http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000
http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000
两条消息的过期时间一致,过期时间短的那条消息,在过期时间到了以后并没有立即被消费,而是和过期时间长的那条消息一起被消费了。所以,如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡”,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
2.10.Rabbitmq 插件实现延迟队列
上面提到的问题,确实是一个问题,如果不能实现TTL的细粒度,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列。
原理:
以往的ttl都是由队列处理,而插件延迟功能是由exchange来管理,当ttl达到截至时间再发送到对应的queue
安装:
这里不做演示,可百度去官网下载和安装对应版本的插件(好文推荐),安装完成后重启如下所示
新增一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:
我们自定义的交换机中,这是一种新的交换机类型,该类型消息支持延迟投递机制,消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。
2.11.配置信息
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";
@Bean("delayedQueue")
public Queue delayedQueue(){
return new Queue(DELAYED_QUEUE_NAME);
}
/**
* 自定义交换机 定义一个延迟交换机
* 不需要死信交换机和死信队列,支持消息延迟投递,消息投递之后没有到达投递时间,是不会投递给队列
* 而是存储在一个分布式表,当投递时间到达,才会投递到目标队列
* @return
*/
@Bean("delayedExchange")
public CustomExchange delayedExchange(){
Map<String, Object> args = new HashMap<>(1);
// 自定义交换机的类型
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") CustomExchange delayedExchange){
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
2.12.生产消息
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public String sendMsg(@PathVariable String message, @PathVariable Integer delayTime){
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, messagePostProcessor ->{
messagePostProcessor.getMessageProperties().setDelay(delayTime);
return messagePostProcessor;
});
log.info("当前时间:{},发送一条延迟{}毫秒的信息给队列delay.queue:{}", new Date(), delayTime, message);
return "发送成功";
}
}
2.13.消费消息
@Slf4j
@Component
public class DeadLetterConsumer {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message){
String msg = new String(message.getBody());
log.info("当前时间:{},收到延时队列的消息:{}", new Date(), msg);
}
}
发起请求:
http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
第二条消息被先消费掉了,符合预期
2.14.总结
延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好地利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列,来保证消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好要的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
四、发布确认
如果由于异常重启rabbitmq,那么中间的消息将会投递失败且丢失,只能数据补偿。生产环境如何保证异常下的数据不会丢失?
最好的方法只能是消息先行磁盘存储,如果服务异常或者重启仍然可以追回消息
发布确认使用方式
单消息发布确认:channel发布消息,然后channel执行waitForConfirms(),虽然简单但是低效,1000个简单消息处理耗时1s
批量消息发布确认:channel发布一批消息(例如for循环发布),然后channel执行waitForConfirms(),1000个简单消息处理耗时0.15s
异步发布确认: channel发布消息,然后channel执行 addReturnListener(ReturnCallback returnCallback),1000个简单消息处理耗时0.06s,不过需要你实现里面的两个回调接口并作为参数传入,一个是发布确认接口,另一个是不可达消息处理接口
springBoot下发布确认使用方式
spring-boot-starter-amqp提供了配置信息:spring.rabbitmq.publisher-confirm-type,主要用于设置发布确认类型
correlated:成功发布消息到交换机,可以异步触发回调
simple:支持rabbittemplate使用waitForConfirms方法进行同步确认
none:禁用发布模式
异步处理不可达消息方式
在仅开启生产者确认机制的情况下,交换机接收到消息后会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息被直接丢弃,生产者也感知不到消息被丢弃。这是一种不可靠的情况。解决方法就是设置mandatory参数,即生产者设置回退消息回调方法用来处理回退消息。
springBoot的mandatory设置方式:spring.rabbitmq.publisher-returns=true
代码演示
发布确认和不可达消息处理写一块了,如下
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
//注入发布确认回调接口
rabbitTemplate.setConfirmCallback(this);
// 注入失败回退回调接口
rabbitTemplate.setReturnsCallback(this);
}
/**
* 交换机确认回调方法
* 1.发送消息 交换机收到消息 回调
* correlationData 保存回调消息的ID和相关信息
* 交换机收到消息 ack = true
* cause null
* 2.发送消息 交换机接受失败 回调
* correlationData 保存回调消息的ID和相关信息
* 交换机收到消息 ack = false
* cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData!=null ? correlationData.getId() : "";
if(ack){
log.info("确认回调:id:{}", id);
}else {
log.info("交换机未接受到消息,id:{},原因:{}", id, cause);
}
}
//将不可达的消息传递给生产者
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("消息:{};\r\n退回消息交换机:{};\r\n退回原因:{};\r\nrouting key:{}",new String(returnedMessage.getMessage().getBody()), returnedMessage.getExchange(), returnedMessage.getReplyText(), returnedMessage.getRoutingKey());
}
}
五、备份交换机
有了mandatory和回退消息回调方法,我们有机会去处理不可达的消息,但是有时候我们并不知道该如何处理这些不可达的消息,最多进行日志打印,然后触发告警,最后补偿数据,为了更方便地解决这个问题催生了备份交换机
备份交换机是指为普通交换机指定一个备用交换机,当交换机无法路由消息时就转给备份交换机来处理
【注:】
如果同时设置了备份交换机和mandatory设置,会先执行备份交换机处理,最后再由mandatory设置的回退消息回调方法来兜底
六、其它
1.幂等
幂等就是对同一操作的重复执行,如删除操作和查询操作,天然兼容幂等,但是新增和修改必须解决幂等影响,比如查询某个用户的流水,不管怎么重复操作都不会影响流水数据,删除也一样,但是新增就麻烦了,比如付款,对当前订单的付款如果重复付款肯定是业务异常的,修改也是如此
消费端的幂等保障一般是:1 唯一ID+指纹码机制,利用数据库去重;2 利用redis原子性实现。唯一码是一些规则或者时间戳加别的服务信息拼接成的唯一码,要么就是redis执行setnx指令,天然支持幂等
2.优先级队列
在系统中有一个订单催付的业务,商城会在订单有效期内推送消息给客户作为一个下单提醒,很细节的功能,但是展开来看还是有点东西的,比如系统会区分大小客户,比如高价订单会被优先处理,这涉及到队列的优先级设置,简单系统可以利用定时任务轮询,但是发杂系统还是要用消息队列,rabbitmq就提供了一个优先级配置参数
2.1.优先级队列控制台设置方式
标记会有个pri,表示该队列有权限参数配置
2.2.优先级队列代码设置方式
配置信息
@Configuration
public class PriorityConfig {
@Bean
public Queue queuePri(){
// 优先级以小为优先,设置范围0-255,不用设置太大消耗内存
return QueueBuilder.durable("priority_queue").withArgument("x-max-priority", 10).build();
}
}
消费端
@Slf4j
@Component
public class PriorityConsumer {
@RabbitListener(queues = "priority_queue")
public void receiveConfirmMessage(Message message){
log.warn("排序处理!消息:{}", new String(message.getBody()));
}
}
生产消息测试
@RestController
@RequestMapping("/priority")
public class PrioritySendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
private String sendMsg(@PathVariable("message") String message){
MessageProperties messageProperties = new MessageProperties();
Message msg = null;
for(int i=1; i<11; i++){
if(i==5){
//优先级以小为优先
messageProperties.setPriority(1);
}else {
messageProperties.setPriority(10);
}
msg = new Message((message+":"+i).getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.convertAndSend("","priority_queue",msg.getBody());
}
return "发布成功!!!";
}
}
3.惰性队列
从3.6.0开始引入惰性队列,功能是将消息最大限度存储到磁盘,当消息被消费时才被加载进内存。但是默认情况,生产者发送消息到队列时会最大限度存储到内存,这是为了更高效处理消息
优点:有磁盘兜底可以大量地存储消息、使消息更加可靠(比如防止宕机丢失)
缺点:在被写入磁盘时依然会在内存备份,当mq释放内存时会将内存的消息换页到磁盘,这个操作比较消耗内存,也会阻塞队列导致接受不到消息,尤其消息量高时问题明显
总结:常规消息量高效处理可以用默认队列(default mode);解决消息堆积可以用惰性队列(lazy mode),好文推荐https://blog.csdn.net/TheWindOfSon/article/details/130808424?spm=1001.2101.3001.6650.2&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EYuanLiJiHua%7EPosition-2-130808424-blog-104037059.235%5Ev38%5Epc_relevant_anti_vip_base&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EYuanLiJiHua%7EPosition-2-130808424-blog-104037059.235%5Ev38%5Epc_relevant_anti_vip_base&utm_relevant_index=5
控制台配置信息
代码配置信息
@Configuration
public class LazyModeConfig {
@Bean
public Queue queueLazyMode(){
return QueueBuilder.durable("lazy_mode_queue").withArgument("x-queue-mode", "lazy").build();
}
}
消息消费
@Slf4j
@Component
public class LazyModeConsumer {
@RabbitListener(queues = "lazy_mode_queue")
public void receiveConfirmMessage(Message message){
log.warn("懒惰队列!消息:{}", new String(message.getBody()));
}
}
消息生产
@RestController
@RequestMapping("/lazyMode")
public class LazyModeController {
@Autowired
private RabbitTemplate rabbitTemplate;
// lazyMode队列的可执行测试
@GetMapping("/sendMsg/{message}")
private String sendMsg(@PathVariable("message") String message){
//obj参数如果是非Message类型实例则默认执行消息持久化
rabbitTemplate.convertAndSend("","lazy_mode_queue",message);
return "发布成功!!!";
}
// lazyMode队列消息是否可持久化测试
@GetMapping("/sendMsg2/{message}")
private String sendMsg2(@PathVariable("message") String message){
//obj参数设置Message类型实例,交付模式为非持久化。如果mq宕机恢复后消息依然存在则说明lazyMode队列确实是做了消息的持久化
//测试流程:
// 1.关闭消费者LazyModeConsumer(测试结束后恢复开启)
// 2.执行接口
// 3.查看控制台,是否有待消费消息
// 4.如果有,stop mq服务
// 5.start mq服务,观察待消费消息是否依然存在
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
Message msg = new Message((message).getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.convertAndSend("","lazy_mode_queue",msg.getBody());
return "发布成功!!!";
}
}
创建Message类型实例发送消息,通过模拟宕机测试出的结论:消息不会丢失,持久化得到了实际验证
6.4.交付模式(delivery mode)
delivery mode是指生产者发送消息的属性是否支持持久化,要么是none_persistent、要么是persistent
七、集群
91-Shovel_哔哩哔哩_bilibili
待续