消息队列高级

news2024/11/23 7:13:31

目录

 消息可靠性

生产者消息确认 

 第一步:修改application.yml配置文件信息

 第二步:定义发送者确认confirm回调方法

 第三步:创建消息发送者回执return回调方法(确保消息从交换机到消息队列)

总结:

消息持久化 

消费者消息确认 

SpringAMQP则允许配置三种确认模式:

auto问题

 消费者是失败重试

本地重试

失败策略

总结


消息队列在使用时,有以下的问题需要考虑

  • 消息可靠性问题(一个消息至少被消费一次)
  • 延迟消息问题
  • 高可用问题
  • 消息堆积问题

 消息可靠性

消息从发送,到消费者接收,会经理多个过程:

 

其中的每一步都可能导致消息丢失,常见的丢失原因包括:

  • 发送时丢失:
    • 生产者发送的消息未送达exchange
    • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

针对这些问题,RabbitMQ分别给出了解决方案:

  • 生产者确认机制
  • mq持久化
  • 消费者确认机制
  • 失败重试机制

生产者消息确认 

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。 

这个可以保证消息发送者成功发送消息到交换机以及消息队列中 

有两种返回结果:

第一种:确认消息是否发送到交换机--》publisher-confirm

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack

第二种:确认消息是否成功从交换机路由到对应的消息队列中--》publisher-return

  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

 

 第一步:修改application.yml配置文件信息

spring:
  rabbitmq:
    host: 192.168.230.100 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: hhh
    password: 1234
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
    • simple:同步等待confirm结果,直到超时
    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  • template.mandatory定义消息路由失败时的策略。true,则调用ReturnCallback回调方法;false:则直接丢弃消息

 第二步:定义发送者确认confirm回调方法

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //获取rabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

        //设置发送者确认回调方法
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData 自定义的消息
             * @param ack ack确认,true为发送到交换机成功
             * @param s 失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String s) {
                if(ack){
                    log.info("消息到达交换机");
                }else {
                    log.error("消息没有到交换机,原因为:{}",s);
                }
            }
        });
    }
}

发送消息:

    @Test
    public void test01() throws InterruptedException {
        String routingKey="simple";
        String message="hello spring";
        rabbitTemplate.convertAndSend("mqAd.topic",routingKey,message);
        Thread.sleep(2000); //让主线程休眠2s,返回回调方法还没执行,主线程就关闭
    }

发送失败触发消息确认的回调方法,因为我现在没有创建maAd.topic交换机,所以无法发送消息到交换机中

创建交换机 

 @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("mqAd.topic");
    }

 

 第三步:创建消息发送者回执return回调方法(确保消息从交换机到消息队列)

 //设置消息发送者回执回调方法,交换机路由消息队列错误才会回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
//            *
//             *
//             * @param message 返回的信息
//             * @param i 回复的状态码
//             * @param s 回复内容
//             * @param s1 交换机
//             * @param s2 路由

            @Override
            public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                log.error("路由定位失败,{},{},{},{},{}",message,i,s,s1,s2);
            }
        });

这时候没有对应的消息队列,会触发消息回执return 回调方法

路由定位失败,(Body:'hello spring' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),312,NO_ROUTE,mqAd.topic,simple

创建消息队列并绑定 

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("mqAd.topic");
    }
    @Bean
    public Queue simpleQueue(){
        return new Queue("mqAd.simple.queue");
    }
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(simpleQueue()).to(topicExchange()).with("simple");
    }

总结:

1.消息确认confirm回调方法触发:不论消息是否成功到达交换机都会触发,成功返回的是ack,失败返回的是nack

2.消息回执return回调方法触发:只有交换机路由消息队列失败时才会触发

消息持久化 

防止mq宕机,然后消息队列里面的消息全部丢失

生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。

要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。

  • 交换机持久化
  • 队列持久化
  • 消息持久化

 实际上默认交换机和队列,消息都是持久化的

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("mqAd.topic",true,false);
    }
    @Bean
    public Queue simpleQueue(){
        return new Queue("mqAd.simple.queue",true);

持久化标识: 

消费者消息确认 

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。

而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。

设想这样的场景:

  • 1)RabbitMQ投递消息给消费者
  • 2)消费者获取消息后,返回ACK给RabbitMQ
  • 3)RabbitMQ删除消息
  • 4)消费者宕机,消息尚未处理

SpringAMQP则允许配置三种确认模式:

•manual:手动ack,需要在业务代码结束后,调用api发送ack。

•auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack

•none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

由此可知:

  • none模式下,消息投递是不可靠的,可能丢失
  • auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
  • manual:自己根据业务情况,判断什么时候该ack

一般,我们都是使用默认的auto即可。

logging:
  pattern:
    dateformat: HH:mm:ss:SSS
  level:
    cn.itcast: debug
spring:
  rabbitmq:
    host: 192.168.230.100 # rabbitMQ的ip地址
    port: 5672 # 端口
    #addresses: 192.168.150.101:8071, 192.168.150.101:8072, 192.168.150.101:8073
    username: hhh
    password: 1234
    virtual-host: /
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto

auto问题

 auto会有一种问题,就会监听方法出现异常,消息队列的消息就无法正常处理,那么spring会自动发消息到mq说明这个消息没有被正常消费,不要删除,然后mq又会把消息重新返回队列,而监听方法又会监听到这个消息队列的信息,然后监听方法又出现异常,消息又回到消息队列,这就在项目中进行无限次的循环重试

 消费者是失败重试

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力: 

本地重试

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 2 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 4 # 执行次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

失败-->1s后重试1次-->2秒后重试第二次-->4秒后重试第三次

每次失败的等待时长是之前的2倍数 ,执行次数包括第一次失败

注意:重试次数结束之后还没有成功,消息队列的消息就会被删除 

失败策略

在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

声明处理错误消息的交换机和消息队列,并且声明一个RepublishMessageRecoverer bean,告诉spring使用的是这种失败策略

@Configuration
public class ErrorMessageConfig {

    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorMessageBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }

    //设置消息失败处理策略
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

重复处理三次之后还是错误,就把这个消息发送到 消息错误交换机中

 

 

总结

如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认,回执机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2238336.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

宏观经济学笔记

【拯救者】宏观经济学速成 国民生产总值GNP: GNP 衡量一国(地区)成员在一定时期内运用生产要素所生产的全部最终产品和服务的市场价值。凡是本国国民所 创造的收入,不管生产要素是否在国内,都计入本国GNP中。 GDP本国居民在本国创造的价值外国居民在本国…

ONLYOFFICE 8.2测评:功能增强与体验优化,打造高效办公新体验

引言 随着数字化办公需求的不断增长,在线办公软件市场竞争愈加激烈。在众多办公软件中,ONLYOFFICE 无疑是一个颇具特色的选择。它不仅支持文档、表格和演示文稿的在线编辑,还通过开放的接口与强大的协作功能,吸引了众多企业和个人…

独显装完ubuntu后启动黑屏显示/dev/sda:clean files blocks的解决方案

解决方案如下: 选中Ubuntu按E键 在编辑界面倒数第2行的linux那行(后面有quiet splash选项)的最后添加nomodeset 然后按F10保存重启 然后管理员权限打开/etc/modprobe.d/blacklist.conf,在文件末尾添加: blacklist…

[Docker#2] 发展历史 | Namespace环境隔离 | Cgroup资源控制

目录 1.发展历史 Jail 时代 云时代 云原生时代 技术标准的确立 虚拟机 vs Docker 2. 容器化技术 2.1 Namespace 命令详解 1. dd 命令 2. mkfs 命令 3. df 命令 4. mount 命令 5. unshare 命令 实战 进程隔离 文件隔离 2.2 CGroup 相关命令 2.1 pidstat 2.…

AI生活之我用AI处理Excel表格

AI生活之我用AI处理Excel表格 场景再现AI提问词AI代码运行调试结果心得感受 场景再现 因学习需要,整理了某个题库,方便自己刷题使用。 已将每套题打上了制定标签,得到一个Excel表格。截图如下: 需求是:一共35套题&…

Stable Diffusion Web UI - ControlNet 姿势控制 openpose

openpose 是 ControlNet 中常用的控制模式之一。 通过 openpose 可以锁定人物姿势,把姿势信息传递给 Stable Diffusion 扩散模型,让其在扩散生成图片的时候遵照特定的任务姿势。 通过 openpose 能够得到类似如下效果: 同样的姿势&#xff0…

第三百一十九节 Java线程教程 - Java线程中断

Java线程教程 - Java线程中断 我们可以通过使用interrupt()方法中断一个活动的线程。 这个方法调用在线程只是一个指示。它是由线程如何响应中断。 例子 下面的代码显示了中断主线程并打印线程中断状态的代码。 public class Main {public static void main(String[] args)…

人工智能(AI)和机器学习(ML)技术学习流程

目录 人工智能(AI)和机器学习(ML)技术 自然语言处理(NLP): Word2Vec: Seq2Seq(Sequence-to-Sequence): Transformer: 范式、架构和自注意力: 多头注意力: 预训练、微调、提示工程和模型压缩: 上下文学习、思维链、全量微调、量化、剪枝: 思维树、思维…

Cynet:全方位一体化安全防护工具

前言 1999年,布鲁斯施奈尔曾说过:“复杂性是安全最大的敌人。”彼时还是19年前,而现在,网络安全已然变得更加繁杂。 近日我在网上冲浪过程中发现了这么一个平台性质的软件,看似具有相当强的防护能力。 根据Cynet的描…

可变类型参数

将形参设为可变类型参数,首先自己的函数要先有一个确定的形参,然后剩余的参数为 ... 用到三个宏,va_list, va_start, va_arg . va_list: 当作一个类型,底层是一个char* 被 typedef va_strat: 先定义一个va_list 类型的变量&#x…

AlphaFold3 开源啦!喜大普奔!

2024年5月8日,AlphaFold3 正式发布!时隔半年,今天,AlphaFold3 终于开源啦!🎉 不过别太激动哈哈哈哈哈,权重还是要额外申请的! 半年前,AlphaFold3 的发布激起了学术界的广…

什么是多因素身份验证(MFA)的安全性?

多因素身份验证(MFA)简介 什么是MFA 多因素身份验证(MFA)是一种安全过程,要求用户在授予对系统、应用程序或账户的访问权限之前提供两种或多种形式的验证。仅使用单个因素(通常是用户名和密码)保护资源会使它们容易受到泄露,添加…

Autosar CP Can State Mangement规范导读

CanSM的主要功能 CAN网络通信模式控制 管理CAN网络的启动、停止和不同通信模式(如全通信、静默通信、无通信)之间的切换。通过状态机实现对CAN网络状态的精确控制,确保网络在不同条件下稳定运行。错误处理与状态报告 根据AUTOSAR基础软件的错误分类方案处理错误,包括开发错…

【Python爬虫实战】全面解析 DrissionPage:简化 Python 浏览器自动化的三种模式

🌈个人主页:易辰君-CSDN博客 🔥 系列专栏:https://blog.csdn.net/2401_86688088/category_12797772.html ​ 目录 前言 一、DrissionPage简介 (一)ChromiumPage (二)WebPage &a…

测试驱动:编写完善测试用例的艺术

测试驱动:编写完善测试用例的艺术 如何编写测试用例 如何撰写高效的测试用例,为产品的稳定性和质量保驾护航。无论你是新手还是经验丰富的测试工程师,让我们一起深入探讨,掌握测试用例编写的精髓! 1. 明确测试目标 …

Linux系统编译boot后发现编译时间与Windows系统不一致的解决方案

现象 如下图,从filezilla软件看虚拟机Linux中编译的uboot.img修改时间与Windows系统时间不同 解决过程 在Linux中查看编译的uboot详细信息,从而得到编译时间。终端输入ls -l后,如下图: 结论 说明在Linux是按照Windows系统时…

24.11.10

星期一: 补 23ICPC 合肥 G cf传送门 思路:由使第 k个最大这种条件易联想到二分,但是如何check是个问题 check使用dp,先想到个比较朴素的状态设定,dp【i】【j】…

JavaSE:初识Java(学习笔记)

java是高级语言的面向对象语言 .[最贴近生活.最快速分析和设计程序] 一,计算机语言发展历史 二,Java体系结构 1,JavaSE(Java Standard Edition) 标准版,定位在个人计算机上的应用 这个版本是Jav…

SQL 专项练习题(合集)

1,第一题 1)表名:t_patent_detail (专利明细表) 2)表字段:专利号(patent_id)、专利名称(patent_name)、专利类型(patent_type)、申请时间 (aplly_date)、授权时间(authorize_date)、申请人(a…

使用ffmpeg播放rtsp视频流

获取IPC摄像机视频流一般使用GB28181或者RTSP协议,这两款协议是比较常见的;两者都有开源的库,下面介绍如何使用RTSP获取进行IPC视频流; 准备库 ffmepg是个开源的库,该库集成了rtsp协议,可以直接使用;首先…