SpringBoot教程(十五) | SpringBoot集成RabbitMq(消息丢失、消息重复、消息顺序、消息顺序)

news2025/1/23 3:14:02

SpringBoot教程(十五) | SpringBoot集成RabbitMq(消息丢失、消息重复、消息顺序、消息顺序)

  • RabbitMQ常见问题解决方案
    • 问题一:消息丢失的解决方案
      • (1)生成者丢失消息
        • 丢失的情景
        • 解决方案1:发送方确认机制(推荐,最常用)
        • 解决方案2:事务(不推荐,因为性能差)
      • (2)MQ丢失消息
        • 丢失的情景
        • 解决方案:开启RabbitMQ的持久化+开启镜像队列
      • (3)消费者丢失消息
        • 丢失的情景 1
        • 解决方案:无需解决
        • 丢失的情景 2
        • 扩展:重试机制
        • 解决方案:消费者方确认机制(推荐,最常用)
    • 问题二:消息重复的解决方案
      • 什么时候会重复消费
      • 如何解决
    • 问题三:保证消息顺序的解决方案
      • 单一队列和单一消费者模式(RabbitMQ)
    • 问题四:消息堆积的解决方案
      • 消息堆积原因
      • 预防措施
      • 已出事故的解决措施

RabbitMQ常见问题解决方案

问题一:消息丢失的解决方案

首先明确一条消息的传送流程:生产者->MQ->消费者
所以这三个节点都可能丢失数据

(1)生成者丢失消息

丢失的情景

发送消息过程中出现网络问题:生产者以为发送成功,但RabbitMQ server没有收到

解决方案1:发送方确认机制(推荐,最常用)

发送方确认机制最大的好处在于它是异步的,等信道返回ark确认的同时继续发送下一条消息(不会堵塞其他消息的发送)

(一)修改application.properties配置

# 确认消息已发送到交换机(Exchange)
spring.rabbitmq.publisher-confirms=true #旧版本 
spring.rabbitmq.publisher-confirm-type=correlated #新版本 
# 确认消息已发送到队列(Queue)
spring.rabbitmq.publisher-returns=true

springBoot 2.2.0.RELEASE版本之前 是使用 spring.rabbitmq.publisher-confirms=true
在2.2.0及之后 使用spring.rabbitmq.publisher-confirm-type=correlated 属性配置代替

(二)新建配置文件RabbitTemplate

对于 发送确认 写法有多种方式,以下的是其中的一种方式

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitTemplateConfig {
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //setMandatory设置表示:消息在没有被队列接收时是否应该被退回给生产者(true:退回;false:丢弃)。
        //通常与yml配置文件中的publisher-returns配合一起使用,若不配置该项,setReutrnCallback将不会有消息返回
        rabbitTemplate.setMandatory(true);

        //帮助生产者判断 确认消息是否成功发送到RabbitMQ
        //ack 为true表示已发送成功 false表示发送失败
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            System.out.println("ConfirmCallback:     "+"相关数据:"+correlationData);
            System.out.println("ConfirmCallback:     "+"确认情况:"+ack);
            System.out.println("ConfirmCallback:     "+"原因:"+cause);
        });

        //当消息无法 放到队列里面时 返回的提醒
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.out.println("ReturnCallback:     "+"消息:"+message);
            System.out.println("ReturnCallback:     "+"回应码:"+replyCode);
            System.out.println("ReturnCallback:     "+"回应信息:"+replyText);
            System.out.println("ReturnCallback:     "+"交换机:"+exchange);
            System.out.println("ReturnCallback:     "+"路由键:"+routingKey);
        });
        return rabbitTemplate;
    }
}
解决方案2:事务(不推荐,因为性能差)

RabbitMQ提供的事务功能,在生产者发送数据之前开启RabbitMQ事务

(2)MQ丢失消息

丢失的情景

RabbitMQ服务端接收到消息后由于服务器宕机或重启等原因(消息默认存在内存中)导致消息丢失;

解决方案:开启RabbitMQ的持久化+开启镜像队列

RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化、消息的持久化
三者 都 持久化 才能保证 RabbitMQ服务重启之后,消息才能存在且能发出去

交换机持久化
交换机持久化描述的是当这个交换机上没有注册队列时,这个交换机是否删除。
如果要打开持久化的话也很简单 (上面列子都是有体现的)

//定义直接交换机
@Bean
public DirectExchange directExchange() {
   //第一个参数:定义交换机的名称,第二个参数:是否持久化,第三个参数:是否自动删除
   return new DirectExchange("directExchange", true, false);
}

队列持久化
队列持久化描述的是当这个队列没有消费者在监听时,是否进行删除。
持久化做法:

//定义队列
@Bean
public Queue directQueue() {
   //第一个参数:队列的名称,第二个参数:是否持久化
   return new Queue("directQueue", true);
}

消息持久化

关键配置 持久化(MessageDeliveryMode.PERSISTENT)

@Test
public void testDurableMessage() {
    // 1.准备消息
    Message message = MessageBuilder.withBody("hello, rabbitmq".getBytes(StandardCharsets.UTF_8))
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .build();
    // 2.消息ID,封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 3.发送消息
    rabbitTemplate.convertAndSend("simple.queue", message, correlationData);

    log.info("发送消息成功");
}

(3)消费者丢失消息

丢失的情景 1

RabbitMQ服务端向消费者发送完消息之后,网络断了,消息并没有到达消费者

解决方案:无需解决

无需解决。因为此情景下服务端收不到确认消息,会再次发送的。

丢失的情景 2

启用了重试机制,重试指定次数之后,还没成功,但消息被确认。

扩展:重试机制

重试机制的三大前提

  1. 重试模式已启用:通过配置 spring.rabbitmq.listener.simple.retry.enabled=true 来启用重试模式。
  2. 抛出了异常:在 @RabbitListener 标注的方法中抛出了异常,通常是 RuntimeException 或 Error。
    Spring AMQP 会捕获这些异常并根据配置的重试策略来重试消息。
  3. 未达到最大重试次数:消息的重试次数尚未达到配置的最大值(spring.rabbitmq.listener.simple.retry.maxAttempts)。

配置以下即可实现重试操作

# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 重试次数(默认3次)
spring.rabbitmq.listener.simple.retry.max-attempts=5
解决方案:消费者方确认机制(推荐,最常用)

改成手动后就 可以实现 “先操作业务逻辑(数据库操作)后,再手动从队列上删除这个消息” 的动作
其中“从队列上删除这个消息“这个动作体现就是 使用 channel.basicAck 去完成的。
切记改成手动后,这个channel.basicAck方法一定要写。

(一)修改application.properties配置

# 设置消费端手动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual

(二)修改Service接收信息项

当消息在进入 emailProcess、smsProcess(被@RabbitListener注解) 方法时就已经被视为“接收到了”,但是需要 你 执行 channel.basicAck(手动确认)才能让这个消息从队列上删除。

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Service;

import java.io.IOException;


@Service
public class DirectReceiver {

    @RabbitHandler
    @RabbitListener(queues = "emailQueue")  //监听的队列名称
    public void emailProcess(Channel channel, Message message) throws IOException {
        try{
            System.out.println(new String(message.getBody(),"UTF-8"));
            //TODO 具体业务

            .......
            //你使用手动消息确认模式时,basicAck 一定要执行,不然会导致会保留在队列中,无法被消费
            //第1个参数表示消息投递序号
            //第2个参数false只确认当前一个消息收到(大多数情况下都设置为false),true确认所有consumer获得的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            //若是消息没有成功接收,第二个参数设置为true的话,代表重新放回队列中,false则为丢弃,在此也可以做成放置死信队列的操作
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
        }
    }

    
}

确认和拒绝消息:

  • basicAck: 这个方法用于确认消息已被成功处理。
    第一个参数是消息的delivery tag(用于标识消息),
    第二个参数指定是否批量确认(false表示只确认当前消息)。
  • basicReject: 这个方法用于拒绝消息。
    第一个参数同样是delivery tag,
    第二个参数指定是否将消息重新放回队列(false表示不重新放回,即丢弃消息)。

方法解释:

  • emailProcess: 这个方法监听emailQueue队列。
    当队列中有消息时,它会打印出消息的内容,并尝试确认消息。
    如果处理过程中发生异常,它会拒绝消息,但不会重新放回队列(第二个参数为false)。

问题二:消息重复的解决方案

什么时候会重复消费

1.自动提交模式时

消费者收到消息后,要自动提交,但提交后,网络出故障,RabbitMQ服务器没收到提交消息,那么此消息会被重新放入队列,会再次发给消费者。

2.手动提交模式时

情景1:网络故障问题,同上。
情景2:接收到消息并处理结束了,此时消费者挂了,没有手动提交消息。

总体来说就是:网络不可达消费端宕机

如何解决

消费端处理消息的业务逻辑保持幂等性

比如你拿个数据要写库,先根据主键查一下,如果这数据有了,就别插入了,update 一下。
比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。

问题三:保证消息顺序的解决方案

单一队列和单一消费者模式(RabbitMQ)

在这里插入图片描述

在RabbitMQ中,可以确保一个队列只被一个消费者消费,这样可以保证消息按照发送的顺序被处理。
因为队列本身就是一个先进先出的结构。

适用场景:RabbitMQ用户且对消息顺序有严格要求的场景。
优点:实现简单,易于管理。
缺点:可能成为性能瓶颈,在处理大量消息时需要考虑扩展性。

问题四:消息堆积的解决方案

消息堆积原因

消息堆积即消息没及时被消费,是生产者生产消息速度快于消费者消费的速度导致的。
消费者消费慢可能是因为:本身逻辑耗费时间较长、阻塞了。

预防措施

生产者
1.减少发布频率
3.考虑使用队列最大长度限制
消费者
1.优化代码

已出事故的解决措施

情况1:堆积的消息还需要使用

方案1:简单修复

修复 消费者(consumer)的问题,让他恢复消费速度,然后等待几个小时消费完毕

方案2:复杂修复

单队列消费转变为多队列并行消费

也是需要先 修复 消费者(consumer)的问题,再进行下面的步骤

步骤 1: 队列和路由设置
1.创建新队列:在RabbitMQ中创建10个新队列,每个队列分配一个独特的名称。
2. 设置交换机:定义一个直连型(Direct)交换机。
3. 绑定路由键:将每个新队列通过唯一的路由键绑定到直连型交换机上。

伪代码例子:

// 假设这是配置类的一部分  
@Bean  
Queue queue1() {  
    return new Queue("queue1", false);  
}  
@Bean  
Queue queue2() { 
    return new Queue("queue2", false);  
}
// 以此类推,为其他9个队列创建Bean  
.........
@Bean  
DirectExchange exchange() {  
    return new DirectExchange("myExchange");  
}  
@Bean  
Binding binding1(Queue queue1, DirectExchange exchange) {  
    return BindingBuilder.bind(queue1).to(exchange).with("routingKey1");  
}  
@Bean  
Binding binding2(Queue queue2, DirectExchange exchange) { 
    return BindingBuilder.bind(queue2).to(exchange).with("routingKey2");  
}
// 以此类推,为其他队列和路由键创建绑定  
......

步骤 2: 消息分发
1.接收堆积数据:现有消费者(或分发者)接收从发送者处堆积的数据。
2.分发到新队列:实现分发逻辑,将接收到的消息根据路由键分发到相应的10个新队列中。

伪代码例子:

@RabbitListener(queues = "oldQueue")  
public void emailProcess(Message message, Channel channel) throws IOException {  
    try {  
        // 生成1-10之间的顺序数  
        SequentialRandom sequentialRandom = new SequentialRandom()
        String key = sequentialRandom.getNextSequentialRandom();
        // 重新发送消息到交换机,交换机将根据routingKey将消息路由到正确的队列  
        rabbitTemplate.convertAndSend("myExchange", "routingKey"+key, new String(message.getBody(),"UTF-8")); 
  
        // 确认原始队列中的消息(如果您想要的话)  
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);  
    } catch (Exception e) {  
        // 处理异常,可能包括记录日志、发送警报等  
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);  
    }  
}  
public class SequentialRandom {  
    private int currentIndex = 1; // 初始索引为1  
  
    /**  
     * 获取下一个顺序数
     * @return 下一个数字,从1到10循环  
     */  
    public int getNextSequentialRandom() {  
        int next = currentIndex;  
        currentIndex = (currentIndex % 10) + 1; // 使用模运算实现循环,并更新索引  
        return next;  
    }  
}

步骤 3: 并行消费
1.开发新消费端:编写新的消费端程序,该程序能够监并处理来自10个新队列的消息。
2. 部署并启动:将新消费端程序部署到服务器,并启动它以开始并行消费。

伪代码例子:

@Component  
public class ParallelConsumer {  
  
      @RabbitListener(queues = {"queue1"})  
      public void receiveMessage1(Message message) {  
      // 处理消息  
      }  
  
      @RabbitListener(queues = {"queue2"})  
      public void receiveMessage2(Message message) {  
      // 处理消息  
      }  
  
      // ... 
      @RabbitListener(queues = {"queue10"})  
      public void receiveMessage3(Message message) {  
      // 处理消息  
      }  
}

情况2:堆积的消息不需要使用

删除消息即可。(可以在RabbitMQ控制台删除,或者使用命令)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2113469.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TensorRT-LLM高级用法

--multi_block_mode decoding phase, 推理1个新token, 平时:按照batch样本,按照head,将计算平均分给所有SM; batch_size*num_heads和SM数目相比较小时:有些SM会空闲;加了--multi_block_mode&…

JavaScript 知识点(从基础到进阶)

🌏个人博客主页:心.c ​ 前言:JavaScript已经学完了,和大家分享一下我的笔记,希望大家可以有所收获,花不多说,开干!!! 🔥🔥&#x1f5…

urllib与requests爬虫简介

urllib与requests爬虫简介 – 潘登同学的爬虫笔记 文章目录 urllib与requests爬虫简介 -- 潘登同学的爬虫笔记第一个爬虫程序 urllib的基本使用Request对象的使用urllib发送get请求实战-喜马拉雅网站 urllib发送post请求 动态页面获取数据请求 SSL证书验证伪装自己的爬虫-请求头…

【redis】数据量庞大时的应对策略

文章目录 为什么数据量多了主机会崩分布式系统应用数据分离架构应用服务集群架构负载均衡器数据库读写分离 引入缓存冷热分离架构 分库分表微服务是什么代价优势 为什么数据量多了主机会崩 一台主机的硬件资源是有上限的,包括但不限于一下几种: CPU内存…

【最新华为OD机试E卷-支持在线评测】猜字迷(100分)-多语言题解-(Python/C/JavaScript/Java/Cpp)

🍭 大家好这里是春秋招笔试突围 ,一枚热爱算法的程序员 ✨ 本系列打算持续跟新华为OD-E/D卷的三语言AC题解 💻 ACM金牌🏅️团队| 多次AK大厂笔试 | 编程一对一辅导 👏 感谢大家的订阅➕ 和 喜欢💗 🍿 最新华为OD机试D卷目录,全、新、准,题目覆盖率达 95% 以上,…

.hmallox、.rmallox勒索病毒揭秘:如何保护你的数据免受威胁

导言 .hmallox、.rmallox勒索病毒是一种加密型勒索病毒,以其特定的加密机制和传播方式而闻名。它主要通过钓鱼邮件或恶意下载链接感染计算机系统。一旦入侵系统,它会加密受害者的文件,并要求支付赎金以恢复数据。了解 .hmallox 、.rmallox勒…

2024数学建模国赛选题建议+团队助攻资料(已更新完毕)

目录 一、题目特点和选题建议 二、模型选择 1、评价模型 2、预测模型 3、分类模型 4、优化模型 5、统计分析模型 三、white学长团队助攻资料 1、助攻代码 2、成品论文PDF版 3、成品论文word版 9月5日晚18:00就要公布题目了,根据历年竞赛题目…

解决npm i 安装报npm ERR! code E401

1、前端去维护项目时,通过 git clone 下来以后,经常是直接 npm i 去安装项目需要的依赖,但是往往很多项目不是我们自己写的,或者从 GitHub 上面 clone 的开源项目,这个时候出现问题就很难处理,这里分享下安…

java基础-线程实现

文章目录 什么是线程线程的基本特性线程的状态线程的调度 线程的实现方式1. 继承 Thread 类2. 实现 Runnable 接口3. 使用 Callable 和 Future4. 使用 ExecutorService总结 什么是线程 线程(Thread)是计算机科学中的一个重要概念,它是操作系…

EmguCV学习笔记 C# 9.2 VideoWriter类

版权声明:本文为博主原创文章,转载请在显著位置标明本文出处以及作者网名,未经作者允许不得用于商业目的。 EmguCV是一个基于OpenCV的开源免费的跨平台计算机视觉库,它向C#和VB.NET开发者提供了OpenCV库的大部分功能。 教程VB.net版本请访问…

【数据结构】--初识泛型

1. 包装类 在Java中,由于基本类型不是继承自Object,为了在泛型代码中可以支持基本类型,Java给每个基本类型都对应了一个包装类型。 1.1 基本数据类型和对应的包装类 除了 Integer 和 Character, 其余基本类型的包装类都是首字母…

1051 找矩阵中的鞍点

### 思路 1. 输入一个3行4列的整数矩阵。 2. 遍历每一行,找到每一行的最大值及其列索引。 3. 检查该列索引对应的列中是否是最小值。 4. 如果是,则输出该值;如果没有找到鞍点,输出“NO”。 ### 伪代码 1. 初始化一个3行4列的矩阵…

SLT—List详解

1.list概述 相较于 vector 的连续线性空间,list 就显得复杂很多,它的好处是每次插入或删除一个数据,就配置或释放一个元素空间。因此,list 对于空间的运用有绝对的精准,一点也不浪费。对于任何位置的元素进行插入或者元…

连续信号的matlab表示

复习信号与系统以及matlab 在matlab中连续信号使用较小的采样间隔来表四 1.单位阶跃信号 阶跃信号:一个理想的单位阶跃信号在时间 t 0 之前值为0,在 t 0 及之后值突然变为常数 A(通常取 A 1) %matlab表示连续信号,是让信号的采样间隔很小…

WebGIS面试题(第九期)

坐标系: 文章目录 **坐标系:**1、如何使用ArcGIS进行GIS坐标系转换?2、Cesium中的Cartesian3坐标系的原点在哪里?它的轴是如何定义的?3、如何在Cesium中使用矩阵进行坐标系转换。4、在Cesium中,如何将屏幕坐…

在VScode上写网页(html)

一、首先点进VScode,下载3个插件。 VScode安装:VScode 教程 | 菜鸟教程 二、新建 HTML 文件 作者运行的代码来自:http://t.csdnimg.cn/vIAQi 把代码复制粘贴进去,然后点击文件→另存为→选择html格式。 三、运行代码

笔试强训,[NOIP2002普及组]过河卒牛客.游游的水果大礼包牛客.买卖股票的最好时机(二)二叉树非递归前序遍历

目录 [NOIP2002普及组]过河卒 牛客.游游的水果大礼包 牛客.买卖股票的最好时机(二) 二叉树非递归前序遍历 [NOIP2002普及组]过河卒 题里面给的提示很有用,那个马的关系,后面就注意,dp需要作为long的类型。 import java.util.Sc…

店匠科技携手Stripe共谋电商支付新篇章

在全球电商行业蓬勃发展的背景下,支付环节作为交易闭环的核心,其重要性日益凸显。随着消费者对支付体验要求的不断提高,以及跨境电商的迅猛发展,支付市场正经历着前所未有的变革与挑战。在这一充满机遇与竞争的领域,店匠科技(Shoplazza)凭借其创新的嵌入式支付解决方案—— Sho…

[米联客-XILINX-H3_CZ08_7100] FPGA程序设计基础实验连载-39 HDMI视频输入测试

软件版本:VIVADO2021.1 操作系统:WIN10 64bit 硬件平台:适用 XILINX A7/K7/Z7/ZU/KU 系列 FPGA 实验平台:米联客-MLK-H3-CZ08-7100开发板 板卡获取平台:https://milianke.tmall.com/ 登录“米联客”FPGA社区 http…

软考(计算机技术与软件专业技术资格(水平)考试)

天行健,君子以自强不息;地势坤,君子以厚德载物。 每个人都有惰性,但不断学习是好好生活的根本,共勉! 文章均为学习整理笔记,分享记录为主,如有错误请指正,共同学习进步。…