Rabbitmq消息不丢失

news2025/1/8 4:39:09

目录

  • 一、消息不丢失
    • 1.消息确认
    • 2.消息确认业务封装
      • 2.1 发送确认消息测试
      • 2.2 消息发送失败,设置重发机制

一、消息不丢失

消息的不丢失,在MQ角度考虑,一般有三种途径:
1,生产者不丢数据
2,MQ服务器不丢数据
3,消费者不丢数据
保证消息不丢失有两种实现方式:
1,开启事务模式
2,消息确认模式
说明:开启事务会大幅降低消息发送及接收效率,使用的相对较少,因此我们生产环境一般都采取消息确认模式,以下我们只是讲解消息确认模式

1.消息确认

消息持久化
如果希望RabbitMQ重启之后消息不丢失,那么需要对以下3种实体均配置持久化
Exchange
声明exchange时设置持久化(durable = true)并且不自动删除(autoDelete = false)
Queue
声明queue时设置持久化(durable = true)并且不自动删除(autoDelete = false)
message
发送消息时通过设置deliveryMode=2持久化消息

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。那么如何持久化呢,其实也很容易,就下面两步:
1、将queue的持久化标识durable设置为true,则代表是一个持久的队列
2、发送消息的时候将deliveryMode=2
这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据

发送确认
有时,业务处理成功,消息也发了,但是我们并不知道消息是否成功到达了rabbitmq,如果由于网络等原因导致业务成功而消息发送失败,那么发送方将出现不一致的问题,此时可以使用rabbitmq的发送确认功能,即要求rabbitmq显式告知我们消息是否已成功发送。

手动消费确认
有时,消息被正确投递到消费方,但是消费方处理失败,那么便会出现消费方的不一致问题。比如:订单已创建的消息发送到用户积分子系统中用于增加用户积分,但是积分消费方处理却都失败了,用户就会问:我购买了东西为什么积分并没有增加呢?
要解决这个问题,需要引入消费方确认,即只有消息被成功处理之后才告知rabbitmq以ack,否则告知rabbitmq以nack

2.消息确认业务封装

service-mq修改配置
开启rabbitmq消息确认配置,在common的配置文件中都已经配置好了!

spring:
  rabbitmq:
    host: 192.168.121.140
    port: 5672
    username: admin
    password: admin
    publisher-confirms-type: correlated  #交换机的确认
    publisher-returns: true  #队列的确认
    listener:
      simple:
        acknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manual
        prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发

搭建rabbit-util模块
由于消息队列是公共模块,我们把mq的相关业务封装到该模块,其他service微服务模块都可能使用,因此我们把他封装到一个单独的模块,需要使用mq的模块直接引用该模块即可
搭建方式如:
pom.xml

    <dependencies>
        <!--rabbitmq消息队列-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--rabbitmq 协议-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>
    </dependencies>

4.2.4 封装发送端消息确认

/**
 * @Description 消息发送确认
 * <p>
 * ConfirmCallback  只确认消息是否正确到达 Exchange 中
 * ReturnCallback   消息没有正确到达队列时触发回调,如果正确到达队列不执行
 * <p>
 * 1. 如果消息没有到exchange,则confirm回调,ack=false
 * 2. 如果消息到达exchange,则confirm回调,ack=true
 * 3. exchange到queue成功,则不回调return
 * 4. exchange到queue失败,则回调return
 * 
 */
@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 修饰一个非静态的void()方法,在服务器加载Servlet的时候运行,并且只会被服务器执行一次在构造函数之后执行,init()方法之前执行。
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);            //指定 ConfirmCallback
        rabbitTemplate.setReturnCallback(this);             //指定 ReturnCallback
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("消息发送成功:" + JSON.toJSONString(correlationData));
        } else {
            log.info("消息发送失败:" + cause + " 数据:" + JSON.toJSONString(correlationData));
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        // 反序列化对象输出
        System.out.println("消息主体: " + new String(message.getBody()));
        System.out.println("应答码: " + replyCode);
        System.out.println("描述:" + replyText);
        System.out.println("消息使用的交换器 exchange : " + exchange);
        System.out.println("消息使用的路由键 routing : " + routingKey);
    }

 }

封装消息发送

@Service
public class RabbitService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     *  发送消息
     * @param exchange 交换机
     * @param routingKey 路由键
     * @param message 消息
     */
    public boolean sendMessage(String exchange, String routingKey, Object message) {
 
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
        return true;
    }
   
}

2.1 发送确认消息测试

消息发送端

@RestController
@RequestMapping("/mq")
public class MqController {


   @Autowired
   private RabbitService rabbitService;


   /**
    * 消息发送
    */
   //http://localhost:8282/mq/sendConfirm
   @GetMapping("sendConfirm")
   public Result sendConfirm() {
     
      rabbitService.sendMessage("exchange.confirm", "routing.confirm", "来人了,开始接客吧!");
      return Result.ok();
   }
}

消息接收端

@Component
public class ConfirmReceiver {

@SneakyThrows
@RabbitListener(bindings=@QueueBinding(
        value = @Queue(value = "queue.confirm",autoDelete = "false"),
        exchange = @Exchange(value = "exchange.confirm",autoDelete = "true"),
        key = {"routing.confirm"}))
public void process(Message message, Channel channel){
    System.out.println("RabbitListener:"+new String(message.getBody()));

        // false 确认一个消息,true 批量确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    
}
}

测试:http://localhost:8282/mq/sendConfirm

2.2 消息发送失败,设置重发机制

实现思路:借助redis来实现重发机制
模块中添加依赖

<!-- redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- spring2.X集成redis所需common-pool2-->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
</dependency>

自定义一个实体类来接收消息

@Data
public class GmallCorrelationData extends CorrelationData {

    //  消息主体
    private Object message;
    //  交换机
    private String exchange;
    //  路由键
    private String routingKey;
    //  重试次数
    private int retryCount = 0;
    //  消息类型  是否是延迟消息
    private boolean isDelay = false;
    //  延迟时间
    private int delayTime = 10;
}

修改发送方法

//  封装一个发送消息的方法
public Boolean sendMsg(String exchange,String routingKey, Object msg){
    //  将发送的消息 赋值到 自定义的实体类
    GmallCorrelationData gmallCorrelationData = new GmallCorrelationData();
    //  声明一个correlationId的变量
    String correlationId = UUID.randomUUID().toString().replaceAll("-","");
    gmallCorrelationData.setId(correlationId);
    gmallCorrelationData.setExchange(exchange);
    gmallCorrelationData.setRoutingKey(routingKey);
    gmallCorrelationData.setMessage(msg);

    //  发送消息的时候,将这个gmallCorrelationData 对象放入缓存。
    redisTemplate.opsForValue().set(correlationId, JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);
    //  调用发送消息方法
    //this.rabbitTemplate.convertAndSend(exchange,routingKey,msg);
    this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,gmallCorrelationData);
    //  默认返回true
    return true;
}

发送失败调用重发方法  MQProducerAckConfig 类中修改
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    //  ack = true 说明消息正确发送到了交换机
    if (ack){
        System.out.println("哥们你来了.");
        log.info("消息发送到了交换机");
    }else {
        //  消息没有到交换机
        log.info("消息没发送到交换机");
        //  调用重试发送方法
        this.retrySendMsg(correlationData);
    }
}

@Override
public void returnedMessage(Message message, int code, String codeText, String exchange, String routingKey) {
    System.out.println("消息主体: " + new String(message.getBody()));
    System.out.println("应答码: " + code);
    System.out.println("描述:" + codeText);
    System.out.println("消息使用的交换器 exchange : " + exchange);
    System.out.println("消息使用的路由键 routing : " + routingKey);

    //  获取这个CorrelationData对象的Id  spring_returned_message_correlation
    String correlationDataId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
    //  因为在发送消息的时候,已经将数据存储到缓存,通过 correlationDataId 来获取缓存的数据
    String strJson = (String) this.redisTemplate.opsForValue().get(correlationDataId);
    //  消息没有到队列的时候,则会调用重试发送方法
    GmallCorrelationData gmallCorrelationData = JSON.parseObject(strJson,GmallCorrelationData.class);
    //  调用方法  gmallCorrelationData 这对象中,至少的有,交换机,路由键,消息等内容.
    this.retrySendMsg(gmallCorrelationData);
}

/**
 * 重试发送方法
 * @param correlationData   父类对象  它下面还有个子类对象 GmallCorrelationData
 */
private void retrySendMsg(CorrelationData correlationData) {
    //  数据类型转换  统一转换为子类处理
    GmallCorrelationData gmallCorrelationData = (GmallCorrelationData) correlationData;
    //  获取到重试次数 初始值 0
    int retryCount = gmallCorrelationData.getRetryCount();
    //  判断
    if (retryCount>=3){
        //  不需要重试了
        log.error("重试次数已到,发送消息失败:"+JSON.toJSONString(gmallCorrelationData));
    } else {
        //  变量更新
        retryCount+=1;
        //  重新赋值重试次数 第一次重试 0->1 1->2 2->3
        gmallCorrelationData.setRetryCount(retryCount);
        System.out.println("重试次数:\t"+retryCount);

        //  更新缓存中的数据
        this.redisTemplate.opsForValue().set(gmallCorrelationData.getId(),JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);

        //  调用发送消息方法 表示发送普通消息  发送消息的时候,不能调用 new RabbitService().sendMsg() 这个方法
            this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),gmallCorrelationData);
   
    }
}

测试:只需修改(错误信息)
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/873291.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL入门学习教程(二)

上一篇文章讲的是mysql的基本操作&#xff0c;这一篇会有一点难以理解&#xff0c;本节主要内容mysql视图&#xff0c;存储过程&#xff0c;函数&#xff0c;事务&#xff0c;触发器&#xff0c;以及动态执行sql 视图view 视图是一个虚拟表&#xff0c;其内容由查询定义。同真…

在Java中操作Redis(详细-->从环境配置到代码实现)

在Java中操作Redis 文章目录 在Java中操作Redis1、介绍2、Jedis3、Spring Data Redis3.1、对String的操作3.2、对哈希类型数据的操作3.3、对list的操作3.4、对set类型的操作3.5、对 ZSet类型的数据&#xff08;有序集合&#xff09;3.6、通用类型的操作 1、介绍 Redis 的Java客…

开发者必知:.gitignore 文件的魔法,助你管理项目文件如虎添翼!

前言&#xff1a; 在软件开发的世界中&#xff0c;版本控制是一个至关重要的环节。而 Git 作为目前最流行的分布式版本控制系统之一&#xff0c;已经成为开发者不可或缺的工具。然而&#xff0c;在日常的开发过程中&#xff0c;有些文件是不适合被纳入版本控制的&#xff0c;比…

【C++入门】const 成员函数

文章目录 一、基本概念二、经典问题三、使用建议 一、基本概念 const 修饰的成员函数就称作 const 成员函数。 例子&#xff1a; class Date { public:void Display() const{...}private:int _year;int _month;int _day; };事实上&#xff0c;const 成员函数的这个 const 修…

Linux文件系统管理

Linux文件系统管理 磁盘的组成与分区 计算机用于存取文件的硬件是磁盘&#xff0c;磁盘的组成主要有磁盘盘、机械手臂、磁盘读取头与主轴马达所组成&#xff0c; 而数据的写入其实是在磁盘盘上面。磁盘盘上面又可细分出扇区(Sector)与磁道(Track)两种单位&#xff0c; 其中扇区…

YOLOv5入门实践(3)— 手把手教你如何去划分数据集

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。数据集标注完成之后&#xff0c;下一步就是对这些数据集进行划分了。面对繁杂的数据集&#xff0c;如果手动划分的话&#xff0c;不仅麻烦而且不能保持随机性。本节课就给大家介绍一种方法&#xff0c;即使用代码去划分数据…

Express 实战(一):概览

在正式学习 Express 内容之前&#xff0c;我们有必要从大的方面了解一下 Node.js 。 在很长的一段时间里&#xff0c;JavaScript 一门编写浏览器中运行脚本的语言。不过近些年&#xff0c;随着互联网的发展以及技术进步&#xff0c;JavaScript 迎来了一个集中爆发的时代。一个…

思科交换机和路由器使用TFTP备份和还原配置文件

&#xff08;1&#xff09;给交换机配置管理地址&#xff0c;保证交换机与服务器相连通 SW1(config)#int vlan 1 SW1(config-if)#ip add 192.168.1.1 255.255.255.0 SW1(config-if)#no shut SW1#write &#xff08;2&#xff09;备份startup-config到服务器 SW1#copy startup…

【linux教程学习笔记】

目录 一. Linux系统目录结构 ​编辑 二. Linux文件基本属性 1. 文件属性分析 2. 更改文件属性 2.1. chgrp&#xff1a;change group&#xff0c;更改文件所属的组 2. chown&#xff1a;change owner&#xff0c;更改文件所属的用户&#xff0c;也可同时更改文件所属的组…

UG NX二次开发(C#)-CAM-获取刀具类型

文章目录 1、前言2、UG NX中的刀具类型3、获取刀具类型3.1 刀具类型帮助文档1、前言 在UG NX的加工模块,加工刀具是一个必要的因素,其包括了多种类型的类型,有铣刀、钻刀、车刀、磨刀、成型刀等等,而且每种刀具所包含的信息也各不相同。想获取刀具的信息,那就要知道刀具的…

php如何对接伪原创api

在了解伪原创api的各种应用形态之后&#xff0c;我们继续探讨智能写作背后的核心技术。需要说明的是&#xff0c;智能写作和自然语言生成、自然语言理解、知识图谱、多模算法等各类人工智能算法都有紧密的关联&#xff0c;在百度的智能写作实践中&#xff0c;常根据实际需求将多…

RT-Thread Smart 用户态开发体验

背景 RT-Thread Smart 是基于 RT-Thread 操作系统上的混合操作系统&#xff0c;它把应用从内核中独立出来&#xff0c;形成独立的用户态应用程序&#xff0c;并具备独立的地址空间。 自 V5.0.0 起&#xff0c;rt-smart 分支已合并至 master 分支上&#xff0c;下载 rt-thread …

2023年上半年数学建模竞赛题目汇总与难度分析

2023年上半年数学建模竞赛题目汇总与难度分析 ​由于近年来国赛ABC题出题方式漂浮不定&#xff0c;没有太大的定性&#xff0c;目前总体的命题方向为&#xff0c;由之前的单一模型问题变为数据分析评价优化或者预测类题目是B、C题的主要命题方向。为了更好地把握今年命题的主方…

快捷键使用技巧

IDEA生成序列化ID 1 CtrlAlts快捷键打开设置界面 2 选择Editor→Inspections&#xff0c;勾上serialVersionUID 3 每次实现序列化接口&#xff0c;可以鼠标点击类名&#xff0c;AltEnter快捷键导入序列化ID webstorm 快捷键重构 shiftf6 全局替换 通过快捷键CtrlShiftR打…

带扩散器的超快速控制网

一、说明 自从稳定扩散风靡全球以来&#xff0c;人们一直在寻找更好地控制生成过程结果的方法。ControlNet提供了一个最小的界面&#xff0c;允许用户在很大程度上自定义生成过程。使用 ControlNet&#xff0c;用户可以轻松地使用不同的空间上下文&#xff08;如深度图、分割图…

Cpp学习——vector模拟实现

vector简介 在模拟实现vector之前&#xff0c;首先就得知道vector是个啥&#xff1f;vector是个啥呢&#xff1f;vector是一个stl里面的容器&#xff0c;并且是一个模板容器。它就像是一个顺序表模板。还记得顺序表吧&#xff1f;之前我实现的顺序表只能弄整形的数据&#xff0…

深入篇【Linux】学习必备:进程理解(从底层探究进程概念/进程创建/进程状态/进程优先级)

深入篇【Linux】学习必备&#xff1a;进程理解(从底层探究进程概念/进程创建/进程状态/进程优先级&#xff09; 一.进程概念(PCB/task_struct)二.查看进程(top/ps)三.创建进程(fork)四.进程状态(僵尸进程/孤儿进程)五.进程优先级(PRI/NI) 一.进程概念(PCB/task_struct) 1.什么…

不同路径——力扣62

文章目录 题目描述解法一 动态规划题目描述 解法一 动态规划 int uniquePaths(int m, int n) {vector<vector

【Java】项目管理工具Maven的安装与使用

文章目录 1. Maven概述2. Maven的下载与安装2.1 下载2.2 安装 3. Maven仓库配置3.1 修改本地仓库配置3.2 修改远程仓库配置3.3 修改后的settings.xml 4. 使用Maven创建项目4.1 手工创建Java项目4.2 原型创建Java项目4.3 原型创建Web项目 5. Tomcat启动Web项目5.1 使用Tomcat插件…

LeetCode150道面试经典题-- 两数之和(简单)

1.题目 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标值 target 的那 两个 整数&#xff0c;并返回它们的数组下标。 你可以假设每种输入只会对应一个答案。但是&#xff0c;数组中同一个元素在答案里不能重复出现。 你可以按任意…