RabbitMQ生产者的可靠性

news2024/12/24 21:30:22

目录

MQ使用时会出现的问题

生产者的可靠性

1、生产者重连

2、生产者确认

3、数据持久化

交换机持久化

队列持久化

消息持久化

LazyQueue懒加载


MQ使用时会出现的问题

  • 发送消息时丢失:
    • 生产者发送消息时连接MQ失败
    • 生产者发送消息到达MQ后未找到Exchange
    • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
    • 消息到达MQ后,处理消息的进程发生异常
  • MQ导致消息丢失:
    • 消息到达MQ,保存到队列后,尚未消费就突然宕机
  • 消费者处理消息时:
    • 消息接收后尚未处理突然宕机
    • 消息接收后处理过程中抛出异常

生产者的可靠性

1、生产者重连

首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。

在生产者yml文件添加配置开启重试机制

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

关掉RabbitMQ服务:

docker stop mq

然后测试发送一条消息,会发现会每隔1秒重试1次,总共重试了3次。消息发送的超时重试机制配置成功了!

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码 

2、生产者确认

一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。 不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:

  • MQ内部处理消息的进程发生了异常
  • 生产者发送消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由

针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher ConfirmPublisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执

总结如下:

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。 默认两种机制都是关闭状态,需要通过配置文件来开启。

 在生产者yml文件添加配置开启重试机制

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

一般我们推荐使用correlated,回调机制。

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

@Slf4j
@Configuration
public class MqConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                log.error("触发return callback,");
                log.debug("交换机exchange: {}", returnedMessage.getExchange());
                log.debug("routingKey: {}", returnedMessage.getRoutingKey());
                log.debug("消息message: {}", returnedMessage.getMessage());
                log.debug("replyCode: {}", returnedMessage.getReplyCode());
                log.debug("replyText: {}", returnedMessage.getReplyText());
            }
        });
    }
}

定义ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback

 /**
     * 生产者消息确认
     * @throws InterruptedException
     */
    @Test
    public void testPublisherConfirm() throws InterruptedException {
        // 准备消息
        CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
        cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
               log.error("消息回调失败",ex);
            }

            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                log.debug("收到confirm callback回执");
                if (result.isAck()){
                    log.debug("发送消息成功,收到 ack!");
                }else {
                    log.debug("发送消息失败,收到 nack, reason : {}", result.getReason());
                }
            }
        });
        // 发送消息
        rabbitTemplate.convertAndSend("mq.direct","blue","hello",cd);

        Thread.sleep(2000);
    }

 当路由不对时,提示没有这个路由

 当交换机不对时,提示没有这个交换机

总结:

SpringAMQP中生产者消息确认的几种返回值情况

1、消息投递到了MQ,但是路由失败。会return路由异常原因,返回ACK

2、临时消息投递到了MQ,并且入队成功,返回ACK

3、持久消息投递到了MQ,并且入队完成持久化,返回ACK

4、其它情况都会返回NACK,告知投递失败

如何处理生产者的确认消息?
1、生产者确认需要额外的网络和系统资源开销,尽量不要使用
2、如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己业务问题
3、对于nack消息可以有限次数重试,依然失败则记录异常消息

3、数据持久化

MQ的可靠性
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题

1、MQ宕机,内存中的消息会丢失
2、内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞小

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化
  • 队列持久化
  • 消息持久化
交换机持久化

在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数:

 设置为Durable就是持久化模式,Transient就是临时模式。

队列持久化

在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:

 设置为Durable就是持久化模式,Transient就是临时模式。

消息持久化

在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个properties

注意:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。 不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

 @Test
    public void persistence() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        //MessageDeliveryMode.NON_PERSISTENT 非持久化
        //MessageDeliveryMode.PERSISTENT 持久化
        Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

 消息写入磁盘

LazyQueue懒加载

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储

而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。

 控制台配置Lazy模式

代码配置Lazy模式

在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数也可设置队列为Lazy模式:

 通过bean声明

@Bean
public Queue lazyQueue(){
    return QueueBuilder
            .durable("lazy.queue")
            .lazy() // 开启Lazy模式
            .build();
}
@RabbitListener(queuesToDeclare = @Queue(
        name = "lazy.queue",
        durable = "true",
        arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
    log.info("接收到 lazy.queue的消息:{}", msg);
}

也可以基于注解来声明队列并设置为Lazy模式: 

更新已有队列为lazy模式

对于已经存在的队列,也可以配置为lazy模式,但是要通过设置policy实现。 可以基于命令行设置policy:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  

命令解读:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一个策略
  • Lazy :策略名称,可以自定义
  • "^lazy-queue$" :用正则表达式匹配队列的名字
  • '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
  • --apply-to queues:策略的作用对象,是所有的队列

 也可以在控制台配置policy,进入在控制台的Admin页面,点击Policies,即可添加配置:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1136270.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

linux音频-IIS音频接口

IIS 总线 IIS(Integrate Interface of Sound)即集成音频接口&#xff0c;在上个世纪 80 年代首先被 Philips 公司用于消费产品的音频设备&#xff0c; I2S规范 I2S总线只能用来处理audio data&#xff0c;而别的信号比如控制信号&#xff0c;编码信号则交给别的模块处理。为了…

The Gradle daemon may be trying to use ipv4 instead of ipv6.

重新配置了jdk和gradle&#xff0c;导致新创建的项目都会报该错误。以下是解决办法。 mac的环境 一 改项目JDK的位置 如果一没有解决接着配置 vim ~/.bash_profile 新增 export _JAVA_OPTIONS“-Djava.net.preferIPv4Stacktrue” 参考&#xff1a; https://sites.google.com…

618京东到家APP-门详页反爬实战

一、背景与系统安全需求分析 1. 系统的重要性 上图所示是接口所属位置、对电商平台或在线商店而言,分类查商品都是很重要的,通过为用户提供清晰的商品分类,帮助他们快速找到所需产品,节省浏览时间,提升购物效率,是购物结算产生GMV的核心环节。那么电商平台为什么都很看重…

Creaform形创HandySCAN MAX三维扫描仪大型零部件尺寸测量设备

CASAIM中科院广州电子智能制造事业部连续多年荣获形创Creaform战略级代理商证书。战略级代理商是形创Creaform最高级别的合作伙伴。 2023年CASAIM中科院广州电子智能制造事业部的形创Creaform战略级代理商证书&#xff1a; Creaform 形创是便携式三维测量解决方案和工程服务领…

“摸不着”的数字孪生,如何带来“看得见”的数据效益?

目录 数字孪生的6问6答 01 何为数字孪生&#xff1f; 02 数字孪生的地位与趋势如何&#xff1f; 03 哪些行业在关注和应用数字孪生&#xff1f; 04 数字孪生的应用场景有哪些&#xff1f; 05 数字孪生与智能制造的关系&#xff1f; 06 如何利用数字孪生 第一步&#xf…

QuestPass来袭,500万SUI奖池拉满并降低获奖难度!

自上周Quest 3规调整后&#xff0c;社区的担忧和反馈减少&#xff0c;但是我们仍然看到一些用户在达到资格以及争取奖励方面遇到困难。感谢你们抽出时间与我们反馈这些问题&#xff0c;下面将与你们分享在Quest 3中最新调整的规则信息&#xff1a; 新增QuestPass 为了使任何人…

C语言系统化精讲(五):C语言格式化输入和运算符与表达式

文章目录 一、C语言格式化输入1.1 C语言scanf&#xff1a;读取从键盘输入的数据&#xff08;含输入格式汇总表&#xff09;1.2 C语言输入字符和字符串&#xff08;所有函数大汇总&#xff09;1.2.1 输入单个字符1.2.2 输入字符串 二、运算符与表达式2.1 运算符与表达式2.1.1 运…

我用好说 AI 画出了漫画故事

现在的 AI 有多神奇&#xff1f;我已经可以用它来 “想故事、写分镜、画漫画” 了。 这里就来秀秀用 好说 AI 做出来的一些漫画&#xff1a; 大家多少都遇到过&#xff1a;曾经有个不错的想法&#xff0c;可能只是 “一个场景”、“一句话”&#xff0c;但真的就一闪而过&…

《算法通关村——黄金挑战数组问题》

《算法通关村——黄金挑战数组问题》 数组中出现次数超过一半的数字 描述 数组中有一个数字出现的次数超过数组长度的一半&#xff0c;请找出这个数字。 例如&#xff1a;输入如下所示的一个长度为9的数组{1,2,3,2,2,2,5,4,2}。由于数字2在数组中出现了5次&#xff0c;超过数…

软件测试面试1000问(含答案+文档)

Part1 1、你的测试职业发展是什么&#xff1f; 测试经验越多&#xff0c;测试能力越高。所以我的职业发展是需要时间积累的&#xff0c;一步步向着高级测试工程师奔去。而且我也有初步的职业规划&#xff0c;前3年积累测试经验&#xff0c;按如何做好测试工程师的要点去要求自…

Python基础教程:关于序列操作的方式方法

嗨喽~大家好呀&#xff0c;这里是魔王呐 ❤ ~! python更多源码/资料/解答/教程等 点击此处跳转文末名片免费获取 序列是指按照位置顺序来存储数据的数据结构&#xff0c;也就是说能通过数值索引进行操作。 实际上&#xff0c;python对序列的解释是&#xff1a;只要类型对象中…

C语言系统化精讲(七):C语言数组详解

文章目录 一、数组的基本概念二、一维数组2.1 一维数组的定义2.2 一维数组初始化2.3 一维数组的引用 三、二维数组3.1 二维数组的定义3.2 二维数组初始化3.3 二维数组的引用 四、C语言数组是静态的&#xff0c;不能插入或删除元素五、C语言数组的越界和溢出5.1 数组越界5.2 数组…

C++基础:函数模板

为了代码重用&#xff0c;代码必须是通用的&#xff1b;通用的代码就必须不受数据类型的限制。那么我们可以把数据类型改为一个设计参数&#xff0c;这种类型的程序设计称为参数化程序设计&#xff0c;软件模板有模板构造&#xff0c;包括函数模板和类模板。 函数模板可以用来…

分享一下怎么做一个签到积分的微信小程序

在微信小程序中&#xff0c;签到积分功能是一种非常实用的功能&#xff0c;它可以帮助企业吸引用户&#xff0c;增加用户的忠诚度和活跃度。下面将介绍如何设计和实现一个签到积分的微信小程序。 一、设计页面 签到积分微信小程序的页面设计应该简洁明了&#xff0c;操作简单。…

error: the following arguments are required: --model, --data 解决方法

错误原因&#xff1a;Windows下需要缺乏配置参数&#xff0c;需要进行相关参数配置。 解决办法&#xff1a;在Pycharm的编辑设置&#xff0c;加上–model--model ****,其中****为指定的模型名称&#xff0c;按照自己实际报错进行添加&#xff0c;比如我这里要跑的模型为bert&am…

如何让元素在页面中完美居中?看这篇文章就够了!

&#x1f3ac; 江城开朗的豌豆&#xff1a;个人主页 &#x1f525; 个人专栏 :《 VUE 》 《 javaScript 》 &#x1f4dd; 个人网站 :《 江城开朗的豌豆&#x1fadb; 》 ⛺️ 生活的理想&#xff0c;就是为了理想的生活 ! ​ 目录 ⭐ 专栏简介 &#x1f4d8; 文章引言 一…

Python爬虫防止被封的方法:动态代理ip

目录 前言 一、为什么需要使用动态IP代理 1.网站反爬虫机制 2.突破本地IP限制 3.获取更多数据 二、Python爬虫动态IP代理的实现方法 1.使用第三方库 2.使用爬虫框架 三、预防被封的方法 1.代理池管理 2.请求间隔设置 3.使用多个代理 总结 前言 在进行网站爬取时&…

四周年,创始人李亚飞写给 ShowMeBug 用户的一封信

见信如晤。 此刻&#xff0c;我心怀感激&#xff0c;感谢您对 ShowMeBug 的支持和鼓励。ShowMeBug 在这几年能够在剧烈竞争的环境中存活下来&#xff0c;证明了大家对我们的认可&#xff0c;在此由衷感谢大家。 今天我怀着前所未有的满足感&#xff0c;想分享下 ShowMeBug 成…

共享购模式:重新定义电商购物体验

在当今的电商市场&#xff0c;消费者对购物体验的需求日益增长&#xff0c;他们不再满足于传统的电商模式。为此&#xff0c;共享购模式应运而生&#xff0c;这种创新模式将线下实体商业与线上虚拟商城相结合&#xff0c;为用户带来全新的购物体验。本文将详细讲解共享购模式的…

Mac版好用的Git客户端 Fork 免激活

Fork是一款强大的Git客户端软件&#xff0c;在Mac和Windows操作系统上都可以使用。汇集了众多先进的功能和工具&#xff0c;可以帮助用户更方便地管理和控制Git仓库。 Fork的界面简洁直观&#xff0c;易于使用。它提供了许多高级的Git功能&#xff0c;如分支管理、合并、提交、…