MQ高级(一)消息可靠性

news2025/1/12 16:09:28

消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性

(1)发送时丢失:

1️⃣生产者发送的消息未送达exchange

2️⃣消息到达exchange后未到达queue

(2)MQ宕机,queue将消息丢失

(3)consumer接收到消息后未消费就宕机

一、生产者消息确认(P158)

1. 生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

(1)publisher-confirm,发送者确认

1️⃣消息成功投递到交换机,返回ack

2️⃣消息未投递到交换机,返回nack

(2)publisher-return,发送者回执

1️⃣消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

 

注意:

确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突

2. SpringAMQP实现生产者确认

2.1 在publisher这个微服务的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.150.101 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: itcast
    password: 123321
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

配置说明:

(1)publish-confirm-type:开启publisher-confirm,这里支持两种类型:

1️⃣simple:同步等待confirm结果,直到超时

2️⃣correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback

(2)publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback

(2)template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

2.2 每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext)  {
        // 获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 记录日志
            log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的话,重发消息
        });
    }
}

2.3 发送消息,指定消息ID、消息ConfirmCallback

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
        // 1.准备消息
        String message = "hello, spring amqp!";
        // 2.准备CorrelationData
        // 2.1.消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 2.2.准备ConfirmCallback
        correlationData.getFuture().addCallback(
                // 成功回调
                result -> {
                    // 判断结果
                    if (result.isAck()) {
                        // ACK
                        log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());
                    } else {
                        // NACK
                        log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
                        // 重发消息
                    }
                },
                // 异常回调
                ex -> {
                    // 记录日志
                    log.error("消息发送失败!", ex);
                    // 重发消息
                });
        // 3.发送消息
        rabbitTemplate.convertAndSend("amq.topic", "a.simple.test", message, correlationData);
    }
}

二、消息持久化

MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。

1. 交换机持久化:

    @Bean
    public DirectExchange simpleDirect(){
        //三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 
        return new DirectExchange("simple.direct",true,false);
    }

 2. 队列持久化:

    @Bean
    public Queue simpleQueue(){
        // 使用QueueBuilder构建队列,durable就是持久化的
        return QueueBuilder.durable("simple.queue").build();
    }

3. 消息持久化,

SpringAMQP 中的的消息默认是持久的,可以通过 MessageProperties 中的 DeliveryMode 来指定的:

Message msg = MessageBuilder
            .withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化 
            .build();

三、消费者消息确认

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:

(1)manual:手动ack,需要在业务代码结束后,调用api发送ack。

(2)auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack

(3)none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

配置方式是修改application.yml文件,添加下面配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto
        

四、消费失败重试机制

1. 消费者失败重试

当消费者出现异常后,消息会不断 requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次 requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力

我们可以利用 Spring 的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制的requeue 到 mq 队列。

spring:
  rabbitmq:
    addresses: 192.168.150.101:8071, 192.168.150.101:8072, 192.168.150.101:8073
    username: itcast
    password: 123321
    virtual-host: /
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto
        retry:
          enabled: true    #开启消费者失败重试
          initial-interval: 1000    # 初始的失败等待时长为1秒
          multiplier: 3    # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 4    # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
        

2. 消费者失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecoverer 接口来处理,它包含三种不同的实现:

(1)RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

(2)ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队 (3)RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

 

测试下RepublishMessageRecoverer处理模式:

首先,定义接收失败消息的交换机、队列及其绑定关系。

然后,定义RepublishMessageRecoverer:

@Configuration
public class ErrorMessageConfig {

    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorMessageBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/70735.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【物理应用】基于傅里叶伽辽金谱法二维纳维-斯托克斯附matlab代码

✅作者简介:热爱科研的Matlab仿真开发者,修心和技术同步精进,matlab项目合作可私信。 🍎个人主页:Matlab科研工作室 🍊个人信条:格物致知。 更多Matlab仿真内容点击👇 智能优化算法 …

[附源码]计算机毕业设计教育企业网站Springboot程序

项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: SSM mybatis Maven Vue 等等组成,B/S模式 M…

当我们说大数据Hadoop,究竟在说什么?

前言 提到大数据,大抵逃不过两个问题,一个是海量的数据该如何存储,另外一个就是那么多数据该如何进行查询计算呢。好在这些问题前人都有了解决方案,而Hadoop就是其中的佼佼者,是目前市面上最流行的一个大数据软件&…

【精品】【含数据+代码+论文链接】交通流预测代码集合

前言 交通流预测代码集合 一、T-GCN 一种用于流量预测的时间图卷积网络 准确、实时的交通预测在智能交通系统中起着重要作用,对城市交通规划、交通管理和交通控制具有重要意义。然而,交通预测一直被认为是一个开放的科学问题,受限于城市…

Python完成期末大作业:简易计算器【案例分享】

嗨害大家好鸭!我是小熊猫~ 好像好久都没给大家更新啦! 这次来给大家做一个我弟刚刚做完的期末考试大作业 做一个简易计算器 要求: 1.要有加减乘除四个方法的编写2.提交的代码悟编译错误3.代码需要有基础的健壮性判断 源码、资料电子书点击…

汉明码(海明码)解析

文章目录前言启发汉明码介绍怎么实现汉明码?怎么实现更高模块的汉明码?为什么校验位一定是2的n次方?用更简洁的方式理解汉明码前言 相信使用过光盘的读者都会有这样一种经历,如果不小心刮花了盘面,大部分情况下,把它放进DVD机器却仍然可以播放视频,这是为什么呢? 因为光盘…

Tomcat打破双亲委派机制实现各应用程序的类库相互隔离原理与实现demo

1、实现原理 以Tomcat类加载为例,Tomcat 如果使用默认的双亲委派类加载机制行不行? 我们思考一下:Tomcat是个web容器, 那么它要解决什么问题: 1. 一个web容器可能需要部署两个应用程序,不同的应用程序可能会…

C++对const引用的特殊处理、为什么函数形参的引用建议加上const?只是为了防止值被修改吗?

前言:我们知道普通变量、指针、函数形参,加上const修饰表示不可改变,但是引用前面加上const就有特别之处了 目录 const日常使用 const引用的特别处理 const引用创建临时变量规则 引用形参声明为const的三个理由 const日常使用 我们知道如…

resnet(2)------看看卷积

文章目录1 . 人脑是怎么认识到物体的2. 卷积3. 卷积核1 . 人脑是怎么认识到物体的 在谈卷积之前,我们先来了解一下人是怎么认识物体的。 人脑是个非常复杂的结构,是由无数个神经元连接起来,每个神经元都有自己负责记忆的东西。当人眼看到物体…

关于自增约束auto_increment需要注意的地方,mysql8版本的报错

目录一,自增约束auto_increment需要注意的地方附:就算插入数据失败,也进行自增:二,自增约束auto_increment在MySQL8版本的报错:一,自增约束auto_increment需要注意的地方 1 创建数据库表class&…

Logistic回归

通常,Logistic回归用于二分类问题,例如预测明天是否会下雨。当然它也可以用于多分类问题. Logistic回归是分类方法,它利用的是Sigmoid函数阈值在[0,1]这个特性。Logistic回归进行分类的主要思想是:根据现有数据对分类边界线建立回…

啊?我这手速也太差了吧?——C++Easyx“挑战六秒”小游戏

🐑本文作者:C橙羊🐑 🎮🔊本文代码适合编译环境:DEV-C💻 ✨🧨温馨提示:此文转载于codebus🎉🎠 最近橙羊在Easyx官网的codebus里随便逛逛的时候&am…

SpringMVC从入门到精通(一)

文章目录1. SpringMVC基本概念1.1 三层架构1.2 MVC架构1.3 什么是SpringMVC1.4 SpringMVC的优势2. SpringMVC 的入门2.1 入门程序2.2 SpringMVC执行原理刨析2.3 SpringMVC的核心执行流程2.4 SpringMVC的组件3. RequestMapping注解4.请求参数绑定4.1 参数绑定4.2 请求参数乱码问…

磨金石摄影技能干货分享|优秀纪实摄影作品欣赏—北京记事

1、蜂窝煤 三名青年男子踏着三轮车拉着满满一车蜂窝煤。脸上流露出清澈的笑容。这是九十年代的北京,背后的天安门格外的显眼。那时候处于改革开放的初期,虽然还不是很富裕,但大家脸上洋溢着幸福与希望的笑容。 蜂窝煤是冬天必备,九…

【强化学习论文合集】十一.2018国际表征学习大会论文(ICLR2018)

强化学习(Reinforcement Learning, RL),又称再励学习、评价学习或增强学习,是机器学习的范式和方法论之一,用于描述和解决智能体(agent)在与环境的交互过程中通过学习策略以达成回报最大化或实现特定目标的问题。 本专栏整理了近几年国际顶级会议中,涉及强化学习(Rein…

历届青少年蓝桥杯python编程选拔赛 STEMA评测比赛真题解析【持续更新 已更新至34题】

蓝桥杯python选拔赛真题 历届青少年蓝桥杯python编程选拔赛真题解析 选拔赛 真题34-回文数升级 【蓝桥杯选拔赛真题34】python回文数升级 青少年组蓝桥杯python 选拔赛STEMA比赛真题解析_小兔子编程的博客-CSDN博客python回文数升级2020年青少年组python蓝桥杯选拔赛真题一、…

剑指Offer39——数组中出现次数超过一半的数字

摘要 剑指Offer39 数组中出现次数超过一半的数字 本题常见的三种解法: 哈希表统计法: 遍历数组 nums ,用 HashMap 统计各数字的数量,即可找出 众数 。此方法时间和空间复杂度均为 O(N) 。数组排序法: 将数组 nums 排…

Python学习-8.1.1 标准库(time库的基础与实例)

2.1 time库 time库是Python提供的处理时间标准库。time库提供系统级精确计时器的计时功能,可以用来分析程序性能,也可以让程序暂停运行时间。 2.1.1 时间处理函数 time.time()函数:获取当前时间戳。 代表着如今的时间与1970年1月1日0分0秒…

18.10 字节码指令集与解析举例 - 同步控制指令

同步控制指令 组成 java虚拟机支持两种同步结构:方法级的同步和方法内部一段指令序列的同步,这两种同步都是使用monitor来支持的。 方法级的同步 方法级的同步:是隐式的,即无须通过字节码指令来控制,它实现在方法调…

Java+SSM网上书城全套含微信支付电商购物(含源码+论文+答辩PPT等)

项目功能简介: 本项目含代码详细讲解视频,手把手带同学们敲代码从0到1完成项目 该项目采用技术Springmvc、Spring、MyBatis、Tomcat服务器、MySQL数据库 项目含有源码、配套开发软件、软件安装教程、项目发布教程以及代码讲解教程 项目功能介绍: 系统管理…