MQ如何保证可靠性

news2025/1/10 20:24:46

       📝个人主页:五敷有你      

 🔥系列专栏:MQ

⛺️稳中求进,晒太阳

消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失,所以MQ的可靠性也非常重要。

2.数据持久化

为了提高性能,默认情况下,MQ的数据都是再内存存储的临时数据,重启后就会消失,为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化

  • 队列持久化

  • 消息持久化

我们以控制台界面为例来说明。

2.1.交换机持久化

在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数:

image.png

设置为Durable就是持久化模式,Transient就是临时模式。

2.2.队列持久化

在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:

image.png

除了持久化以外,你可以看到队列还有很多其它参数,有一些我们会在后期学习。

2.3.消息持久化

在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个properties

image.png

说明

        在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。

        不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

 代码层次实现:

 @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "msg.queue",durable ="true"),
                exchange = @Exchange(name = "msg.topic",type = ExchangeTypes.TOPIC,durable = "true"),
                key = "msg"
        ))
        public void listenMsg(String jsonStr){
            log.info("收到消息{}", jsonStr);
            Map<String,Object> map = JSONUtil.toBean(jsonStr, Map.class);
            JSONObject object=new JSONObject(map);
            String actionName =object.getString(Action.ACTION);
            Action action = getAction(actionName);
            action.doMessage(getWebSocketManager(),object);
        }

3.消费者的可靠性

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:

  • 消息投递的过程中出现了网络故障

  • 消费者接收到消息后突然宕机

  • 消费者接收到消息后,因处理不当导致异常

RabbitMQ如何得知消费者的处理状态呢?

3.1消费者确认机制

        为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息

  • nack:消息处理失败,RabbitMQ需要再次投递消息

  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

        一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

        由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

测试下面代码验证: 在none情况下,异常产生后,消息队列中的消息被删除了

  @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "msg.queue",durable ="true"),
                exchange = @Exchange(name = "msg.topic",type = ExchangeTypes.TOPIC,durable = "true"),
                key = "msg"
        ))
        public void listenMsg(String jsonStr){
            log.info("收到消息{}", jsonStr);
            throw new RuntimeException("异常");


}

  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活

  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

    • 如果是业务异常,会自动返回nack

    • 如果是消息处理或校验异常,自动返回reject;

测试下面代码验证: 在auto情况下,异常产生后,消息一直在被重复投递,

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):放行以后,由于抛出的是业务异常,所以Spring返回ack,最终消息恢复至Ready状态,并且没有被RabbitMQ删除: 这个一直Unack的状态。当我们把配置改为auto时,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。

如果是消息转换异常,spring会返回reject

刚刚发现,当消费者出现异常后,消息会不断的requeue(重入队)到队列,再重新发送给消费者,如果消费者执行依然出错,消息会再次投递到队列,直到处理成功为止。

极端情况下,消费之一直无法执行成功,那么消息requeue就会无限循环,导致mq的处理消息飙升,带来不必要的压力。

3.2.失败重试机制

为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次

  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

试了三次,失败就停下来了。

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试

  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃

3.2失败处理策略

本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

1)在consumer服务中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定义一个RepublishMessageRecoverer,关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
package com.aqiuo.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {

    @Bean
    public TopicExchange errorMessageExchange(){
            return new TopicExchange("error.topic",true,false);
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }

    @Bean
    public Binding errorBinding(Queue errorQueue, TopicExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
    }


}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1645629.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C语言】文件操作(万字解读超详细解析)

最好的时光&#xff0c;在路上;最好的生活&#xff0c;在别处。独自上路去看看这个世界&#xff0c;你终将与最好的自己相遇。&#x1f493;&#x1f493;&#x1f493; 目录 • ✨说在前面 &#x1f34b;知识点一&#xff1a;什么是文件&#xff1f; • &#x1f330;1.程序…

[渗透利器]某大佬公开自用红队渗透工具

前言 看到群里大佬发的文章&#xff0c;公开了自用的工具&#xff0c;前来拜膜一下。 使用方式 该工具首先需要初始化数据库&#xff0c;Windows推荐使用PHP Study&#xff0c;搭建更方便。 修改默认root密码后新建数据库&#xff0c;账号密码随便填&#xff0c;公网环境注意…

表空间的概述

目录 表空间的属性 表空间的类型 永久性表空间(PermanentTablespace) 临时表空间(Temp Tablespace ) 撤销表空间(Undo Tablespace) 大文件表空间(BigfileTablespace) 表空间的状态 联机状态(Online) 读写状态(Read Write) 只读状态(Read) 脱机状态(Offline) Oracle从…

[ 项目 ] tcmalloc简化版—高并发内存池

目录 前言 基本介绍 高并发 内存池 定长内存池 基本介绍 框架设计 具体实现 性能测试 整体框架介绍 申请内存过程 threadcache 1.基本介绍 2.具体实现 centralcache 1.基本介绍 2.具体实现 pagecache 1.基本介绍 2.具体实现 申请内存连通 释放内存过…

百科词条创建机构有哪些?

在互联网时代&#xff0c;百度百科作为我国最大的中文百科全书&#xff0c;已经成为人们获取知识、查询信息的重要途径。随着百度百科影响力的不断扩大&#xff0c;越来越多的人和企业试图通过创建企业词条来提升自身知名度&#xff0c;企业和个人为了在百度百科上占据一席之地…

Java_从入门到JavaEE_09

一、构造方法/构造器 含义&#xff1a;和new一起是创建对象的功能 特点&#xff1a; 与类名相同的方法没有返回项 注意&#xff1a; 当类中没有写构造方法时&#xff0c;系统会默认添加无参构造&#xff08;无参数的构造方法&#xff09;构造方法可以重载的 有参构造好处&…

Linux 第二十章

&#x1f436;博主主页&#xff1a;ᰔᩚ. 一怀明月ꦿ ❤️‍&#x1f525;专栏系列&#xff1a;线性代数&#xff0c;C初学者入门训练&#xff0c;题解C&#xff0c;C的使用文章&#xff0c;「初学」C&#xff0c;linux &#x1f525;座右铭&#xff1a;“不要等到什么都没有了…

Transformer详解:从放弃到入门(一)

Transformer由论文《Attention is All You Need》提出&#xff0c;是一种用于自然语言处理&#xff08;NLP&#xff09;和其他序列到序列&#xff08;sequence-to-sequence&#xff09;任务的深度学习模型架构&#xff0c;在自然语言处理领域获得了巨大的成功&#xff0c;在这个…

自动化运维工具---Ansible

一 Puppet Puppet是历史悠久的运维工具之一。它是一种基础架构即代码(laC)工具&#xff0c;使用户可以定义其基础 架构所需的状态&#xff0c;并使系统自动化以实现相同状态。 Puppet可监视用户的所有系统&#xff0c;并防止任何偏离已定义状态的情况。从简单的工作流程自动…

数据仓库基础理论(学习笔记)

数据仓库基础理论 1.数据仓库概念 2.数据仓库为何而来 3.数据仓库主要特征 4.OLTP、OLAP系统 5.数据仓库与数据库的区别 6.数据仓库与数据集市的区别 7.数据仓库分层架构 7.1为什么要分层&#xff1f; 8.ETL、ELT

NFS共享存储服务配置实践

一、NFS 1.NFS定义 NFS&#xff08;Network File System&#xff09;网络文件服务&#xff1a;基于TCP/IP传输的网络文件系统协议&#xff0c;NFS服务的实现依赖于RPC&#xff08;Remote Process Call&#xff09;远端过程调用&#xff1a;通过使用NFS协议&#xff0c;客户机…

ICode国际青少年编程竞赛- Python-1级训练场-变量入门

ICode国际青少年编程竞赛- Python-1级训练场-变量入门 1、 a 4 Dev.turnRight() Dev.step(a)2、 a 4 Spaceship.step(a) Dev.step(a)3、 a 4 Dev.step(a) Dev.turnLeft() Dev.step(a)4、 a 5 Dev.step(a) Spaceship.step(a) Dev.step(a)5、 a 3 Dev.step(a) Dev.tur…

C语言写的LLM训练

特斯拉前 AI 总监、OpenAI 创始团队成员 Andrej Karpathy 用 C 代码完成了 GPT-2 大模型训练过程&#xff1a;karpathy/llm.c: LLM training in simple, raw C/CUDA (github.com) 下载源码 git clone --recursive https://github.com/karpathy/llm.c.git下载模型 从HF-Mirro…

Burp和Proxifier抓包微信小程序

1、Burp设置代理 2、浏览器下载证书 3、安装证书 4、Proxifier设置代理 5、Proxifier设置Proxification Rule 6、Burp查看抓包数据 打开一个小程序&#xff0c;可以看到WeChatAppEx的流量先经过Proxifier&#xff0c;再经过127.0.0.1:8080到Burp

基于现有语言大模型,定制“人人AI气象”公众号天气助手

最近&#xff0c;月之暗面的Kimi大模型非常受欢迎&#xff0c;尝试用了moonshot(128K)基座模型&#xff0c;通过调用各种公开渠道的API&#xff0c;简易实现了一个天气助手&#xff0c;可以回答天气相关的基础概念、原理、应用等方面的问题&#xff0c;同时也可调用多个插件获取…

音频可视化:原生音频API为前端带来的全新可能!

音频API是一组提供给网页开发者的接口&#xff0c;允许他们直接在浏览器中处理音频内容。这些API使得在不依赖任何外部插件的情况下操作和控制音频成为可能。 Web Audio API 可以进行音频的播放、处理、合成以及分析等操作。借助于这些工具&#xff0c;开发者可以实现自定义的音…

MoonBit 开源之夏重磅来袭!12000元奖金等你来拿!

宣讲视频 MoonBit 开源之夏宣讲视频 关于我们 开源之夏 「开源之夏 (OSPP)」是中科院软件所「开源软件供应链点亮计划」指导下的系列暑期活动&#xff0c;旨在鼓励在校学生积极参与开源软件的开发维护&#xff0c;培养和发掘更多优秀的开发者&#xff0c;促进优秀开源软件社区…

JavaScript中Math函数与舍入

立方根 console.log(Math.sqrt(25)); //数学方式25平方根 console.log(25 ** (1 / 2)); //25的0.5次方 console.log(8 ** (1 / 3)); //8的1/3次方计算最大最小值 console.log(Math.max(1, 5, 88, 22, 132)); //返回最大值 console.log(Math.max(1, 5, 88, 22, 132)); //…

【C语言】高质量选择题

目录 题目一&#xff1a; 题目二&#xff1a; 题目三&#xff1a; 题目四&#xff1a; 题目五&#xff1a; 题目六&#xff1a; 题目七&#xff1a; 题目八&#xff1a; 题目九&#xff1a; 题目十&#xff1a; 题目十一&#xff1a; 题目十二&#xff1a; 题目十…

快捷回复软件让你告别回复慢

可能自己是个客服的原因&#xff0c;一连几天大数据给我推了一个叫“客服宝聊天助手”的软件。用了几天真心觉得好用&#xff0c;能解决我回客户很慢的困扰。如果大家对快捷回复软件感兴趣&#xff0c;可以接着了解哦&#xff01; 一、减少复制粘贴 传统的客服工作中&#xff…